Whamcloud - gitweb
46f0ef38c6fcf151320896ecc89de54e60886b7a
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <linux/lustre_mds.h>
36 #include <linux/lustre_dlm.h>
37 #include <linux/init.h>
38 #include <linux/obd_class.h>
39 #include <linux/random.h>
40 #include <linux/fs.h>
41 #include <linux/jbd.h>
42 #include <linux/ext3_fs.h>
43 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
44 # include <linux/smp_lock.h>
45 # include <linux/buffer_head.h>
46 # include <linux/workqueue.h>
47 # include <linux/mount.h>
48 #else
49 # include <linux/locks.h>
50 #endif
51 #include <linux/obd_lov.h>
52 #include <linux/lustre_mds.h>
53 #include <linux/lustre_fsfilt.h>
54 #include <linux/lprocfs_status.h>
55 #include <linux/lustre_commit_confd.h>
56
57 #include "mds_internal.h"
58
59 static int mds_intent_policy(struct ldlm_namespace *ns,
60                              struct ldlm_lock **lockp, void *req_cookie,
61                              ldlm_mode_t mode, int flags, void *data);
62 static int mds_postsetup(struct obd_device *obd);
63 static int mds_cleanup(struct obd_device *obd, int flags);
64
65 /* Assumes caller has already pushed into the kernel filesystem context */
66 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
67                         loff_t offset, int count)
68 {
69         struct ptlrpc_bulk_desc *desc;
70         struct l_wait_info lwi;
71         struct page **pages;
72         int rc = 0, npages, i, tmpcount, tmpsize = 0;
73         ENTRY;
74
75         LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */
76
77         npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT;
78         OBD_ALLOC(pages, sizeof(*pages) * npages);
79         if (!pages)
80                 GOTO(out, rc = -ENOMEM);
81
82         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
83                                     MDS_BULK_PORTAL);
84         if (desc == NULL)
85                 GOTO(out_free, rc = -ENOMEM);
86
87         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
88                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
89
90                 pages[i] = alloc_pages(GFP_KERNEL, 0);
91                 if (pages[i] == NULL)
92                         GOTO(cleanup_buf, rc = -ENOMEM);
93
94                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
95         }
96
97         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
98                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
99                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
100                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
101                        file->f_dentry->d_inode->i_size);
102
103                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
104                                      kmap(pages[i]), tmpsize, &offset);
105                 kunmap(pages[i]);
106
107                 if (rc != tmpsize)
108                         GOTO(cleanup_buf, rc = -EIO);
109         }
110
111         LASSERT(desc->bd_nob == count);
112
113         rc = ptlrpc_start_bulk_transfer(desc);
114         if (rc)
115                 GOTO(cleanup_buf, rc);
116
117         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
118                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
119                        OBD_FAIL_MDS_SENDPAGE, rc);
120                 GOTO(abort_bulk, rc);
121         }
122
123         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
124         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
125         LASSERT (rc == 0 || rc == -ETIMEDOUT);
126
127         if (rc == 0) {
128                 if (desc->bd_success &&
129                     desc->bd_nob_transferred == count)
130                         GOTO(cleanup_buf, rc);
131
132                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
133         }
134
135         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
136                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
137                   desc->bd_nob_transferred, count,
138                   req->rq_export->exp_client_uuid.uuid,
139                   req->rq_export->exp_connection->c_remote_uuid.uuid);
140
141         ptlrpc_fail_export(req->rq_export);
142
143         EXIT;
144  abort_bulk:
145         ptlrpc_abort_bulk (desc);
146  cleanup_buf:
147         for (i = 0; i < npages; i++)
148                 if (pages[i])
149                         __free_pages(pages[i], 0);
150
151         ptlrpc_free_bulk(desc);
152  out_free:
153         OBD_FREE(pages, sizeof(*pages) * npages);
154  out:
155         return rc;
156 }
157
158 int mds_lock_mode_for_dir(struct obd_device *obd,
159                               struct dentry *dentry, int mode)
160 {
161         int ret_mode, split;
162
163         /* any dir access needs couple locks:
164          * 1) on part of dir we gonna lookup/modify in
165          * 2) on a whole dir to protect it from concurrent splitting
166          *    and to flush client's cache for readdir()
167          * so, for a given mode and dentry this routine decides what
168          * lock mode to use for lock #2:
169          * 1) if caller's gonna lookup in dir then we need to protect
170          *    dir from being splitted only - LCK_CR
171          * 2) if caller's gonna modify dir then we need to protect
172          *    dir from being splitted and to flush cache - LCK_CW
173          * 3) if caller's gonna modify dir and that dir seems ready
174          *    for splitting then we need to protect it from any
175          *    type of access (lookup/modify/split) - LCK_EX -bzzz */
176
177         split = mds_splitting_expected(obd, dentry);
178         if (split == MDS_NO_SPLITTABLE) {
179                 /* this inode won't be splitted. so we need not to protect from
180                  * just flush client's cache on modification */
181                 ret_mode = 0;
182                 if (mode == LCK_PW)
183                         ret_mode = LCK_CW;
184         } else {
185                 if (mode == LCK_PR) {
186                         ret_mode = LCK_CR;
187                 } else if (mode == LCK_PW) {
188                         /* caller gonna modify directory.we use concurrent
189                            write lock here to retract client's cache for readdir */
190                         ret_mode = LCK_CW;
191                         if (split == MDS_EXPECT_SPLIT) {
192                                 /* splitting possible. serialize any access */
193                                 CDEBUG(D_OTHER, "%s: gonna split %u/%u\n",
194                                        obd->obd_name,
195                                        (unsigned) dentry->d_inode->i_ino,
196                                        (unsigned) dentry->d_inode->i_generation);
197                                 ret_mode = LCK_EX;
198                         }
199                 } else {
200                         CWARN("unexpected lock mode %d\n", mode);
201                         ret_mode = LCK_EX;
202                 }
203         }
204         return ret_mode;
205 }
206
207 /* only valid locked dentries or errors should be returned */
208 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
209                                      struct vfsmount **mnt, int lock_mode,
210                                      struct lustre_handle *lockh, int *mode,
211                                      char *name, int namelen, __u64 lockpart)
212 {
213         struct mds_obd *mds = &obd->u.mds;
214         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
215         struct ldlm_res_id res_id = { .name = {0} };
216         int flags = 0, rc;
217         ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
218
219         ENTRY;
220
221         if (IS_ERR(de))
222                 RETURN(de);
223
224         res_id.name[0] = de->d_inode->i_ino;
225         res_id.name[1] = de->d_inode->i_generation;
226         lockh[1].cookie = 0;
227 #ifdef S_PDIROPS
228         if (name && IS_PDIROPS(de->d_inode)) {
229                 ldlm_policy_data_t cpolicy =
230                         { .l_inodebits = { MDS_INODELOCK_UPDATE } };
231                 LASSERT(mode != NULL);
232                 *mode = mds_lock_mode_for_dir(obd, de, lock_mode);
233                 if (*mode) {
234                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
235                                               res_id, LDLM_IBITS,
236                                               &cpolicy, *mode, &flags,
237                                               mds_blocking_ast,
238                                               ldlm_completion_ast, NULL, NULL,
239                                               NULL, 0, NULL, lockh + 1);
240                         if (rc != ELDLM_OK) {
241                                 l_dput(de);
242                                 RETURN(ERR_PTR(-ENOLCK));
243                         }
244                 }
245                 flags = 0;
246
247                 res_id.name[2] = full_name_hash(name, namelen);
248
249                 CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
250                        de->d_inode->i_ino, de->d_inode->i_generation,
251                        res_id.name[2]);
252         }
253 #endif
254         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
255                               LDLM_IBITS, &policy, lock_mode, &flags,
256                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
257                               NULL, 0, NULL, lockh);
258         if (rc != ELDLM_OK) {
259                 l_dput(de);
260                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
261 #ifdef S_PDIROPS
262                 if (lockh[1].cookie)
263                         ldlm_lock_decref(lockh + 1, LCK_CW);
264 #endif
265         }
266
267         RETURN(retval);
268 }
269
270 #ifndef DCACHE_DISCONNECTED
271 #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED
272 #endif
273
274
275 /* Look up an entry by inode number. */
276 /* this function ONLY returns valid dget'd dentries with an initialized inode
277    or errors */
278 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
279                               struct vfsmount **mnt)
280 {
281         char fid_name[32];
282         unsigned long ino = fid->id;
283         __u32 generation = fid->generation;
284         struct inode *inode;
285         struct dentry *result;
286
287         if (ino == 0)
288                 RETURN(ERR_PTR(-ESTALE));
289
290         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
291
292         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
293                ino, generation, mds->mds_sb);
294
295         /* under ext3 this is neither supposed to return bad inodes
296            nor NULL inodes. */
297         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
298         if (IS_ERR(result))
299                 RETURN(result);
300
301         inode = result->d_inode;
302         if (!inode)
303                 RETURN(ERR_PTR(-ENOENT));
304
305 #if 0
306         /* here we disabled generation check, as root inode i_generation
307          * of cache mds and real mds are different. */
308         if (generation && inode->i_generation != generation) {
309                 /* we didn't find the right inode.. */
310                 CERROR("bad inode %lu, link: %lu ct: %d or generation %u/%u\n",
311                        inode->i_ino, (unsigned long)inode->i_nlink,
312                        atomic_read(&inode->i_count), inode->i_generation,
313                        generation);
314                 dput(result);
315                 RETURN(ERR_PTR(-ENOENT));
316         }
317 #endif
318
319         if (mnt) {
320                 *mnt = mds->mds_vfsmnt;
321                 mntget(*mnt);
322         }
323
324         RETURN(result);
325 }
326
327
328 /* Establish a connection to the MDS.
329  *
330  * This will set up an export structure for the client to hold state data
331  * about that client, like open files, the last operation number it did
332  * on the server, etc.
333  */
334 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
335                        struct obd_uuid *cluuid)
336 {
337         struct obd_export *exp;
338         struct mds_export_data *med; /*  */
339         struct mds_client_data *mcd;
340         int rc, abort_recovery;
341         ENTRY;
342
343         if (!conn || !obd || !cluuid)
344                 RETURN(-EINVAL);
345
346         /* Check for aborted recovery. */
347         spin_lock_bh(&obd->obd_processing_task_lock);
348         abort_recovery = obd->obd_abort_recovery;
349         spin_unlock_bh(&obd->obd_processing_task_lock);
350         if (abort_recovery)
351                 target_abort_recovery(obd);
352
353         /* XXX There is a small race between checking the list and adding a
354          * new connection for the same UUID, but the real threat (list
355          * corruption when multiple different clients connect) is solved.
356          *
357          * There is a second race between adding the export to the list,
358          * and filling in the client data below.  Hence skipping the case
359          * of NULL mcd above.  We should already be controlling multiple
360          * connects at the client, and we can't hold the spinlock over
361          * memory allocations without risk of deadlocking.
362          */
363         rc = class_connect(conn, obd, cluuid);
364         if (rc)
365                 RETURN(rc);
366         exp = class_conn2export(conn);
367         LASSERT(exp);
368         med = &exp->exp_mds_data;
369
370         OBD_ALLOC(mcd, sizeof(*mcd));
371         if (!mcd) {
372                 CERROR("mds: out of memory for client data\n");
373                 GOTO(out, rc = -ENOMEM);
374         }
375
376         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
377         med->med_mcd = mcd;
378
379         rc = mds_client_add(obd, &obd->u.mds, med, -1);
380         if (rc == 0)
381                 EXIT;
382 out:
383         if (rc) {
384                 OBD_FREE(mcd, sizeof(*mcd));
385                 class_disconnect(exp, 0);
386         }
387         class_export_put(exp);
388
389         return rc;
390 }
391
392 static int mds_init_export(struct obd_export *exp)
393 {
394         struct mds_export_data *med = &exp->exp_mds_data;
395
396         INIT_LIST_HEAD(&med->med_open_head);
397         spin_lock_init(&med->med_open_lock);
398         RETURN(0);
399 }
400
401 static int mds_destroy_export(struct obd_export *export)
402 {
403         struct mds_export_data *med;
404         struct obd_device *obd = export->exp_obd;
405         struct lvfs_run_ctxt saved;
406         int rc = 0;
407         ENTRY;
408
409         med = &export->exp_mds_data;
410         target_destroy_export(export);
411
412         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
413                 GOTO(out, 0);
414
415         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
416
417         /* Close any open files (which may also cause orphan unlinking). */
418         spin_lock(&med->med_open_lock);
419         while (!list_empty(&med->med_open_head)) {
420                 struct list_head *tmp = med->med_open_head.next;
421                 struct mds_file_data *mfd =
422                         list_entry(tmp, struct mds_file_data, mfd_list);
423                 BDEVNAME_DECLARE_STORAGE(btmp);
424
425                 /* bug 1579: fix force-closing for 2.5 */
426                 struct dentry *dentry = mfd->mfd_dentry;
427
428                 list_del(&mfd->mfd_list);
429                 spin_unlock(&med->med_open_lock);
430
431                 /* If you change this message, be sure to update
432                  * replay_single:test_46 */
433                 CERROR("force closing client file handle for %*s (%s:%lu)\n",
434                        dentry->d_name.len, dentry->d_name.name,
435                        ll_bdevname(dentry->d_inode->i_sb, btmp),
436                        dentry->d_inode->i_ino);
437                 rc = mds_mfd_close(NULL, obd, mfd,
438                                    !(export->exp_flags & OBD_OPT_FAILOVER));
439
440                 if (rc)
441                         CDEBUG(D_INODE, "Error closing file: %d\n", rc);
442                 spin_lock(&med->med_open_lock);
443         }
444         spin_unlock(&med->med_open_lock);
445         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
446
447 out:
448         mds_client_free(export, !(export->exp_flags & OBD_OPT_FAILOVER));
449
450         RETURN(rc);
451 }
452
453 static int mds_disconnect(struct obd_export *exp, int flags)
454 {
455         struct obd_device *obd;
456         struct mds_obd *mds;
457         unsigned long irqflags;
458         int rc;
459         ENTRY;
460
461         LASSERT(exp);
462         class_export_get(exp);
463
464         obd = class_exp2obd(exp);
465         if (obd == NULL) {
466                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
467                        exp->exp_handle.h_cookie);
468                 RETURN(-EINVAL);
469         }
470         mds = &obd->u.mds;
471
472         if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)
473                         && !atomic_read(&mds->mds_real_clients)) {
474                 /* there was no client at all */
475                 mds_lmv_disconnect(obd, flags);
476         }
477
478         if ((exp->exp_flags & OBD_OPT_REAL_CLIENT)
479                         && atomic_dec_and_test(&mds->mds_real_clients)) {
480                 /* time to drop LMV connections */
481                 CDEBUG(D_OTHER, "%s: last real client %s disconnected.  "
482                        "Disconnnect from LMV now\n",
483                        obd->obd_name, exp->exp_client_uuid.uuid);
484                 mds_lmv_disconnect(obd, flags);
485         }
486
487         spin_lock_irqsave(&exp->exp_lock, irqflags);
488         exp->exp_flags = flags;
489         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
490
491         /* Disconnect early so that clients can't keep using export */
492         rc = class_disconnect(exp, flags);
493         ldlm_cancel_locks_for_export(exp);
494
495         /* complete all outstanding replies */
496         spin_lock_irqsave(&exp->exp_lock, irqflags);
497         while (!list_empty(&exp->exp_outstanding_replies)) {
498                 struct ptlrpc_reply_state *rs =
499                         list_entry(exp->exp_outstanding_replies.next,
500                                    struct ptlrpc_reply_state, rs_exp_list);
501                 struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
502
503                 spin_lock(&svc->srv_lock);
504                 list_del_init(&rs->rs_exp_list);
505                 ptlrpc_schedule_difficult_reply(rs);
506                 spin_unlock(&svc->srv_lock);
507         }
508         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
509
510         class_export_put(exp);
511         RETURN(rc);
512 }
513
514 static int mds_getstatus(struct ptlrpc_request *req)
515 {
516         struct mds_obd *mds = mds_req2mds(req);
517         struct mds_body *body;
518         int rc, size = sizeof(*body);
519         ENTRY;
520
521         rc = lustre_pack_reply(req, 1, &size, NULL);
522         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
523                 CERROR("mds: out of memory for message: size=%d\n", size);
524                 req->rq_status = -ENOMEM;       /* superfluous? */
525                 RETURN(-ENOMEM);
526         }
527
528         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
529         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
530
531         /* the last_committed and last_xid fields are filled in for all
532          * replies already - no need to do so here also.
533          */
534         RETURN(0);
535 }
536
537 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
538                      void *data, int flag)
539 {
540         int do_ast;
541         ENTRY;
542
543         if (flag == LDLM_CB_CANCELING) {
544                 /* Don't need to do anything here. */
545                 RETURN(0);
546         }
547
548         /* XXX layering violation!  -phil */
549         l_lock(&lock->l_resource->lr_namespace->ns_lock);
550         /* Get this: if mds_blocking_ast is racing with mds_intent_policy,
551          * such that mds_blocking_ast is called just before l_i_p takes the
552          * ns_lock, then by the time we get the lock, we might not be the
553          * correct blocking function anymore.  So check, and return early, if
554          * so. */
555         if (lock->l_blocking_ast != mds_blocking_ast) {
556                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
557                 RETURN(0);
558         }
559
560         lock->l_flags |= LDLM_FL_CBPENDING;
561         do_ast = (!lock->l_readers && !lock->l_writers);
562         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
563
564         if (do_ast) {
565                 struct lustre_handle lockh;
566                 int rc;
567
568                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
569                 ldlm_lock2handle(lock, &lockh);
570                 rc = ldlm_cli_cancel(&lockh);
571                 if (rc < 0)
572                         CERROR("ldlm_cli_cancel: %d\n", rc);
573         } else {
574                 LDLM_DEBUG(lock, "Lock still has references, will be "
575                            "cancelled later");
576         }
577         RETURN(0);
578 }
579
580 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
581                int *size, int lock)
582 {
583         int rc = 0;
584         int lmm_size;
585
586         if (lock)
587                 down(&inode->i_sem);
588         rc = fsfilt_get_md(obd, inode, md, *size);
589         if (lock)
590                 up(&inode->i_sem);
591
592         if (rc < 0) {
593                 CERROR("Error %d reading eadata for ino %lu\n",
594                        rc, inode->i_ino);
595         } else if (rc > 0) {
596                 lmm_size = rc;
597                 
598                 if (S_ISREG(inode->i_mode))
599                         rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
600
601                 if (rc == 0) {
602                         *size = lmm_size;
603                         rc = lmm_size;
604                 } else if (rc > 0) {
605                         *size = rc;
606                 }
607         }
608
609         RETURN (rc);
610 }
611
612
613 /* Call with lock=1 if you want mds_pack_md to take the i_sem.
614  * Call with lock=0 if the caller has already taken the i_sem. */
615 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
616                 struct mds_body *body, struct inode *inode, int lock)
617 {
618         struct mds_obd *mds = &obd->u.mds;
619         void *lmm;
620         int lmm_size;
621         int rc;
622         ENTRY;
623
624         lmm = lustre_msg_buf(msg, offset, 0);
625         if (lmm == NULL) {
626                 /* Some problem with getting eadata when I sized the reply
627                  * buffer... */
628                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
629                        inode->i_ino);
630                 RETURN(0);
631         }
632         lmm_size = msg->buflens[offset];
633
634         /* I don't really like this, but it is a sanity check on the client
635          * MD request.  However, if the client doesn't know how much space
636          * to reserve for the MD, it shouldn't be bad to have too much space.
637          */
638         if (lmm_size > mds->mds_max_mdsize) {
639                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
640                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
641                 // RETURN(-EINVAL);
642         }
643
644         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
645         if (rc > 0) {
646                 if (S_ISDIR(inode->i_mode))
647                         body->valid |= OBD_MD_FLDIREA;
648                 else
649                         body->valid |= OBD_MD_FLEASIZE;
650                 body->eadatasize = lmm_size;
651                 rc = 0;
652         }
653
654         RETURN(rc);
655 }
656
657 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
658                                 struct ptlrpc_request *req,
659                                 struct mds_body *reqbody, int reply_off)
660 {
661         struct mds_body *body;
662         struct inode *inode = dentry->d_inode;
663         int rc = 0;
664         ENTRY;
665
666         if (inode == NULL && !(dentry->d_flags & DCACHE_CROSS_REF))
667                 RETURN(-ENOENT);
668
669         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
670         LASSERT(body != NULL);                 /* caller prepped reply */
671
672         if (dentry->d_flags & DCACHE_CROSS_REF) {
673                 CDEBUG(D_OTHER, "cross reference: %lu/%lu/%lu\n",
674                        (unsigned long) dentry->d_mdsnum,
675                        (unsigned long) dentry->d_inum,
676                        (unsigned long) dentry->d_generation);
677                 body->valid |= OBD_MD_FLID | OBD_MD_MDS;
678                 body->fid1.id = dentry->d_inum;
679                 body->fid1.mds = dentry->d_mdsnum;
680                 body->fid1.generation = dentry->d_generation;
681                 RETURN(0);
682         }
683         mds_pack_inode2fid(obd, &body->fid1, inode);
684         mds_pack_inode2body(obd, body, inode);
685
686         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
687             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
688                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1, body,
689                                  inode, 1);
690
691                 /* If we have LOV EA data, the OST holds size, atime, mtime */
692                 if (!(body->valid & OBD_MD_FLEASIZE) &&
693                     !(body->valid & OBD_MD_FLDIREA))
694                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
695                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
696         } else if (S_ISLNK(inode->i_mode) &&
697                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
698                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1,0);
699                 int len;
700
701                 LASSERT (symname != NULL);       /* caller prepped reply */
702                 len = req->rq_repmsg->buflens[reply_off + 1];
703
704                 rc = inode->i_op->readlink(dentry, symname, len);
705                 if (rc < 0) {
706                         CERROR("readlink failed: %d\n", rc);
707                 } else if (rc != len - 1) {
708                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
709                                 rc, len - 1);
710                         rc = -EINVAL;
711                 } else {
712                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
713                         body->valid |= OBD_MD_LINKNAME;
714                         body->eadatasize = rc + 1;
715                         symname[rc] = 0;        /* NULL terminate */
716                         rc = 0;
717                 }
718         }
719
720         RETURN(rc);
721 }
722
723 static int mds_getattr_pack_msg_cf(struct ptlrpc_request *req,
724                                         struct dentry *dentry,
725                                         int offset)
726 {
727         int rc = 0, size[1] = {sizeof(struct mds_body)};
728         ENTRY;
729
730         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
731                 CERROR("failed MDS_GETATTR_PACK test\n");
732                 req->rq_status = -ENOMEM;
733                 GOTO(out, rc = -ENOMEM);
734         }
735
736         rc = lustre_pack_reply(req, 1, size, NULL);
737         if (rc) {
738                 CERROR("out of memory\n");
739                 GOTO(out, req->rq_status = rc);
740         }
741
742         EXIT;
743  out:
744         return(rc);
745 }
746
747 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
748                                 int offset)
749 {
750         struct mds_obd *mds = mds_req2mds(req);
751         struct mds_body *body;
752         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
753         ENTRY;
754
755         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
756         LASSERT(body != NULL);                 /* checked by caller */
757         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
758
759         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
760             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
761                 int rc;
762                 down(&inode->i_sem);
763                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
764                 up(&inode->i_sem);
765                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
766                        rc, inode->i_ino);
767                 if (rc < 0) {
768                         if (rc != -ENODATA)
769                                 CERROR("error getting inode %lu MD: rc = %d\n",
770                                        inode->i_ino, rc);
771                         size[bufcount] = 0;
772                 } else if (rc > mds->mds_max_mdsize) {
773                         size[bufcount] = 0;
774                         CERROR("MD size %d larger than maximum possible %u\n",
775                                rc, mds->mds_max_mdsize);
776                 } else {
777                         size[bufcount] = rc;
778                 }
779                 bufcount++;
780         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
781                 if (inode->i_size + 1 != body->eadatasize)
782                         CERROR("symlink size: %Lu, reply space: %d\n",
783                                inode->i_size + 1, body->eadatasize);
784                 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
785                 bufcount++;
786                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
787                        inode->i_size + 1, body->eadatasize);
788         }
789
790         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
791                 CERROR("failed MDS_GETATTR_PACK test\n");
792                 req->rq_status = -ENOMEM;
793                 GOTO(out, rc = -ENOMEM);
794         }
795
796         rc = lustre_pack_reply(req, bufcount, size, NULL);
797         if (rc) {
798                 CERROR("out of memory\n");
799                 GOTO(out, req->rq_status = rc);
800         }
801
802         EXIT;
803  out:
804         return(rc);
805 }
806
807 int mds_check_mds_num(struct obd_device *obd, struct inode* inode,
808                       char *name, int namelen)
809 {
810         struct mea *mea = NULL;
811         int mea_size, rc = 0;
812         ENTRY;
813                                                                                                                                                                                                      
814         rc = mds_get_lmv_attr(obd, inode, &mea, &mea_size);
815         if (rc)
816                 RETURN(rc);
817         if (mea != NULL) {
818                 /* dir is already splitted, check is requested filename
819                  * should live at this MDS or at another one */
820                 int i;
821                 i = mea_name2idx(mea, name, namelen - 1);
822                 if (mea->mea_master != i) {
823                         CERROR("inapropriate MDS(%d) for %s. should be %d\n",
824                                 mea->mea_master, name, i);
825                         rc = -ERESTART;
826                 }
827         }
828                                                                                                                                                                                                      
829         if (mea)
830                 OBD_FREE(mea, mea_size);
831         RETURN(rc);
832 }
833
834 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
835                             struct lustre_handle *child_lockh, int child_part)
836 {
837         struct obd_device *obd = req->rq_export->exp_obd;
838         struct ldlm_reply *rep = NULL;
839         struct lvfs_run_ctxt saved;
840         struct mds_body *body;
841         struct dentry *dparent = NULL, *dchild = NULL;
842         struct lvfs_ucred uc;
843         struct lustre_handle parent_lockh[2];
844         int namesize, update_mode;
845         int rc = 0, cleanup_phase = 0, resent_req = 0;
846         char *name;
847         ENTRY;
848
849         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
850
851         /* Swab now, before anyone looks inside the request */
852
853         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
854                                   lustre_swab_mds_body);
855         if (body == NULL) {
856                 CERROR("Can't swab mds_body\n");
857                 GOTO(cleanup, rc = -EFAULT);
858         }
859
860         LASSERT_REQSWAB(req, offset + 1);
861         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
862         if (name == NULL) {
863                 CERROR("Can't unpack name\n");
864                 GOTO(cleanup, rc = -EFAULT);
865         }
866         namesize = req->rq_reqmsg->buflens[offset + 1];
867
868         LASSERT (offset == 0 || offset == 2);
869         /* if requests were at offset 2, the getattr reply goes back at 1 */
870         if (offset) {
871                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
872                 offset = 1;
873         }
874
875         uc.luc_fsuid = body->fsuid;
876         uc.luc_fsgid = body->fsgid;
877         uc.luc_cap = body->capability;
878         uc.luc_suppgid1 = body->suppgid;
879         uc.luc_suppgid2 = -1;
880         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
881         cleanup_phase = 1; /* kernel context */
882         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
883
884         LASSERT(namesize > 0);
885         if (namesize == 1) {
886                 /* we have no dentry here, drop LOOKUP bit */
887                 child_part &= ~MDS_INODELOCK_LOOKUP;
888                 CDEBUG(D_OTHER, "%s: request to retrieve attrs for %lu/%lu\n",
889                        obd->obd_name, (unsigned long) body->fid1.id,
890                        (unsigned long) body->fid1.generation);
891                 dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
892                                                parent_lockh, &update_mode, 
893                                                NULL, 0, child_part);
894                 if (IS_ERR(dchild)) {
895                         CERROR("can't find inode: %d\n", (int) PTR_ERR(dchild));
896                         GOTO(cleanup, rc = PTR_ERR(dchild));
897                 }
898                 memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0]));
899 #ifdef S_PDIROPS
900                 if (parent_lockh[1].cookie)
901                         ldlm_lock_decref(parent_lockh + 1, update_mode);
902 #endif
903                 cleanup_phase = 2;
904                 goto fill_inode;
905         }
906         
907         /* FIXME: handle raw lookup */
908 #if 0
909         if (body->valid == OBD_MD_FLID) {
910                 struct mds_body *mds_reply;
911                 int size = sizeof(*mds_reply);
912                 ino_t inum;
913                 // The user requested ONLY the inode number, so do a raw lookup
914                 rc = lustre_pack_reply(req, 1, &size, NULL);
915                 if (rc) {
916                         CERROR("out of memory\n");
917                         GOTO(cleanup, rc);
918                 }
919
920                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
921
922                 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
923                                            sizeof(*mds_reply));
924                 mds_reply->fid1.id = inum;
925                 mds_reply->valid = OBD_MD_FLID;
926                 GOTO(cleanup, rc);
927         }
928 #endif
929
930         if (child_lockh->cookie != 0) {
931                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
932                 resent_req = 1;
933         }
934
935         if (resent_req == 0) {
936                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
937                                                  parent_lockh, &dparent,
938                                                  LCK_PR, MDS_INODELOCK_LOOKUP,
939                                                  &update_mode, name, namesize,
940                                                  child_lockh, &dchild, LCK_PR,
941                                                  child_part);
942                 if (rc)
943                         GOTO(cleanup, rc);
944         } else {
945                 struct ldlm_lock *granted_lock;
946                 struct ll_fid child_fid;
947                 struct ldlm_resource *res;
948                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
949                 granted_lock = ldlm_handle2lock(child_lockh);
950                 LASSERT(granted_lock);
951
952                 res = granted_lock->l_resource;
953                 child_fid.id = res->lr_name.name[0];
954                 child_fid.generation = res->lr_name.name[1];
955                 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
956                 LASSERT(dchild);
957                 LDLM_LOCK_PUT(granted_lock);
958         }
959
960         cleanup_phase = 2; /* dchild, dparent, locks */
961
962         /* let's make sure this name should leave on this mds node */
963         rc = mds_check_mds_num(obd, dparent->d_inode, name, namesize);
964         if (rc)
965                 GOTO(cleanup, rc);
966
967 fill_inode:
968
969         if (!DENTRY_VALID(dchild)) {
970                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
971                 /* in the intent case, the policy clears this error:
972                    the disposition is enough */
973                 rc = -ENOENT;
974                 GOTO(cleanup, rc);
975         } else {
976                 intent_set_disposition(rep, DISP_LOOKUP_POS);
977         }
978
979         if (req->rq_repmsg == NULL) {
980                 if (dchild->d_flags & DCACHE_CROSS_REF)
981                         rc = mds_getattr_pack_msg_cf(req, dchild, offset);
982                 else
983                         rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
984                 if (rc != 0) {
985                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
986                         GOTO (cleanup, rc);
987                 }
988         }
989
990         rc = mds_getattr_internal(obd, dchild, req, body, offset);
991         GOTO(cleanup, rc); /* returns the lock to the client */
992
993  cleanup:
994         switch (cleanup_phase) {
995         case 2:
996                 if (resent_req == 0) {
997                         if (rc && DENTRY_VALID(dchild))
998                                 ldlm_lock_decref(child_lockh, LCK_PR);
999                         if (dparent) {
1000                                 ldlm_lock_decref(parent_lockh, LCK_PR);
1001 #ifdef S_PDIROPS
1002                                 if (parent_lockh[1].cookie != 0)
1003                                         ldlm_lock_decref(parent_lockh + 1,
1004                                                          update_mode);
1005 #endif
1006                         }
1007                         if (dparent)
1008                                 l_dput(dparent);
1009                 }
1010                 l_dput(dchild);
1011         case 1:
1012                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1013         default: ;
1014         }
1015         return rc;
1016 }
1017
1018 static int mds_getattr(int offset, struct ptlrpc_request *req)
1019 {
1020         struct mds_obd *mds = mds_req2mds(req);
1021         struct obd_device *obd = req->rq_export->exp_obd;
1022         struct lvfs_run_ctxt saved;
1023         struct dentry *de;
1024         struct mds_body *body;
1025         struct lvfs_ucred uc;
1026         int rc = 0;
1027         ENTRY;
1028
1029         body = lustre_swab_reqbuf (req, offset, sizeof (*body),
1030                                    lustre_swab_mds_body);
1031         if (body == NULL) {
1032                 CERROR ("Can't unpack body\n");
1033                 RETURN (-EFAULT);
1034         }
1035
1036         uc.luc_fsuid = body->fsuid;
1037         uc.luc_fsgid = body->fsgid;
1038         uc.luc_cap = body->capability;
1039         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1040         de = mds_fid2dentry(mds, &body->fid1, NULL);
1041         if (IS_ERR(de)) {
1042                 rc = req->rq_status = -ENOENT;
1043                 GOTO(out_pop, PTR_ERR(de));
1044         }
1045
1046         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
1047         if (rc != 0) {
1048                 CERROR ("mds_getattr_pack_msg: %d\n", rc);
1049                 GOTO (out_pop, rc);
1050         }
1051
1052         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
1053
1054         l_dput(de);
1055         GOTO(out_pop, rc);
1056 out_pop:
1057         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1058         return rc;
1059 }
1060
1061
1062 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1063                           unsigned long max_age)
1064 {
1065         int rc;
1066
1067         spin_lock(&obd->obd_osfs_lock);
1068         rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, max_age);
1069         if (rc == 0)
1070                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1071         spin_unlock(&obd->obd_osfs_lock);
1072
1073         return rc;
1074 }
1075
1076 static int mds_statfs(struct ptlrpc_request *req)
1077 {
1078         struct obd_device *obd = req->rq_export->exp_obd;
1079         int rc, size = sizeof(struct obd_statfs);
1080         ENTRY;
1081
1082         rc = lustre_pack_reply(req, 1, &size, NULL);
1083         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1084                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
1085                 GOTO(out, rc);
1086         }
1087
1088         /* We call this so that we can cache a bit - 1 jiffie worth */
1089         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
1090                             jiffies - HZ);
1091         if (rc) {
1092                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1093                 GOTO(out, rc);
1094         }
1095
1096         EXIT;
1097 out:
1098         req->rq_status = rc;
1099         return 0;
1100 }
1101
1102 static int mds_sync(struct ptlrpc_request *req)
1103 {
1104         struct obd_device *obd = req->rq_export->exp_obd;
1105         struct mds_obd *mds = &obd->u.mds;
1106         struct mds_body *body;
1107         int rc, size = sizeof(*body);
1108         ENTRY;
1109
1110         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
1111         if (body == NULL)
1112                 GOTO(out, rc = -EPROTO);
1113
1114         rc = lustre_pack_reply(req, 1, &size, NULL);
1115         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1116                 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1117                 GOTO(out, rc);
1118         }
1119
1120         if (body->fid1.id == 0) {
1121                 /* a fid of zero is taken to mean "sync whole filesystem" */
1122                 rc = fsfilt_sync(obd, mds->mds_sb);
1123                 if (rc)
1124                         GOTO(out, rc);
1125         } else {
1126                 /* just any file to grab fsync method - "file" arg unused */
1127                 struct file *file = mds->mds_rcvd_filp;
1128                 struct dentry *de;
1129
1130                 de = mds_fid2dentry(mds, &body->fid1, NULL);
1131                 if (IS_ERR(de))
1132                         GOTO(out, rc = PTR_ERR(de));
1133
1134                 rc = file->f_op->fsync(NULL, de, 1);
1135                 l_dput(de);
1136                 if (rc)
1137                         GOTO(out, rc);
1138
1139                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
1140                 mds_pack_inode2fid(obd, &body->fid1, de->d_inode);
1141                 mds_pack_inode2body(obd, body, de->d_inode);
1142         }
1143 out:
1144         req->rq_status = rc;
1145         return 0;
1146 }
1147
1148 /* mds_readpage does not take a DLM lock on the inode, because the client must
1149  * already have a PR lock.
1150  *
1151  * If we were to take another one here, a deadlock will result, if another
1152  * thread is already waiting for a PW lock. */
1153 static int mds_readpage(struct ptlrpc_request *req)
1154 {
1155         struct obd_device *obd = req->rq_export->exp_obd;
1156         struct vfsmount *mnt;
1157         struct dentry *de;
1158         struct file *file;
1159         struct mds_body *body, *repbody;
1160         struct lvfs_run_ctxt saved;
1161         int rc, size = sizeof(*repbody);
1162         struct lvfs_ucred uc;
1163         ENTRY;
1164
1165         rc = lustre_pack_reply(req, 1, &size, NULL);
1166         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
1167                 CERROR("mds: out of memory\n");
1168                 GOTO(out, rc = -ENOMEM);
1169         }
1170
1171         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
1172         if (body == NULL)
1173                 GOTO (out, rc = -EFAULT);
1174
1175         uc.luc_fsuid = body->fsuid;
1176         uc.luc_fsgid = body->fsgid;
1177         uc.luc_cap = body->capability;
1178         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1179         de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1180         if (IS_ERR(de))
1181                 GOTO(out_pop, rc = PTR_ERR(de));
1182
1183         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1184
1185         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1186         /* note: in case of an error, dentry_open puts dentry */
1187         if (IS_ERR(file))
1188                 GOTO(out_pop, rc = PTR_ERR(file));
1189
1190         /* body->size is actually the offset -eeb */
1191         if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
1192                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1193                        body->size, de->d_inode->i_blksize);
1194                 GOTO(out_file, rc = -EFAULT);
1195         }
1196
1197         /* body->nlink is actually the #bytes to read -eeb */
1198         if (body->nlink & (de->d_inode->i_blksize - 1)) {
1199                 CERROR("size %u is not multiple of blocksize %lu\n",
1200                        body->nlink, de->d_inode->i_blksize);
1201                 GOTO(out_file, rc = -EFAULT);
1202         }
1203
1204         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1205         repbody->size = file->f_dentry->d_inode->i_size;
1206         repbody->valid = OBD_MD_FLSIZE;
1207
1208         /* to make this asynchronous make sure that the handling function
1209            doesn't send a reply when this function completes. Instead a
1210            callback function would send the reply */
1211         /* body->size is actually the offset -eeb */
1212         rc = mds_sendpage(req, file, body->size, body->nlink);
1213
1214 out_file:
1215         filp_close(file, 0);
1216 out_pop:
1217         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1218 out:
1219         req->rq_status = rc;
1220         RETURN(0);
1221 }
1222
1223 int mds_reint(struct ptlrpc_request *req, int offset,
1224               struct lustre_handle *lockh)
1225 {
1226         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1227         int rc;
1228         ENTRY;
1229
1230         OBD_ALLOC(rec, sizeof(*rec));
1231         if (rec == NULL)
1232                 RETURN(-ENOMEM);
1233
1234         rc = mds_update_unpack(req, offset, rec);
1235         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1236                 CERROR("invalid record\n");
1237                 GOTO(out, req->rq_status = -EINVAL);
1238         }
1239         /* rc will be used to interrupt a for loop over multiple records */
1240         rc = mds_reint_rec(rec, offset, req, lockh);
1241  out:
1242         OBD_FREE(rec, sizeof(*rec));
1243         RETURN(rc);
1244 }
1245
1246 static int mds_filter_recovery_request(struct ptlrpc_request *req,
1247                                        struct obd_device *obd, int *process)
1248 {
1249         switch (req->rq_reqmsg->opc) {
1250         case MDS_CONNECT: /* This will never get here, but for completeness. */
1251         case OST_CONNECT: /* This will never get here, but for completeness. */
1252         case MDS_DISCONNECT:
1253         case OST_DISCONNECT:
1254                *process = 1;
1255                RETURN(0);
1256
1257         case MDS_CLOSE:
1258         case MDS_SYNC: /* used in unmounting */
1259         case OBD_PING:
1260         case MDS_REINT:
1261         case LDLM_ENQUEUE:
1262         case OST_CREATE:
1263                 *process = target_queue_recovery_request(req, obd);
1264                 RETURN(0);
1265
1266         default:
1267                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1268                 *process = 0;
1269                 /* XXX what should we set rq_status to here? */
1270                 req->rq_status = -EAGAIN;
1271                 RETURN(ptlrpc_error(req));
1272         }
1273 }
1274
1275 static char *reint_names[] = {
1276         [REINT_SETATTR] "setattr",
1277         [REINT_CREATE]  "create",
1278         [REINT_LINK]    "link",
1279         [REINT_UNLINK]  "unlink",
1280         [REINT_RENAME]  "rename",
1281         [REINT_OPEN]    "open",
1282 };
1283
1284 #define FILTER_VALID_FLAGS (OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLGENER  |\
1285                             OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ|\
1286                             OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME|\
1287                             OBD_MD_FLID) 
1288
1289 static void reconstruct_create(struct ptlrpc_request *req)
1290 {
1291         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1292         struct mds_client_data *mcd = med->med_mcd;
1293         struct ost_body *body;
1294
1295         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
1296
1297         /* copy rc, transno and disp; steal locks */
1298         mds_req_from_mcd(req, mcd);
1299         CERROR("reconstruct reply for x"LPU64"\n", req->rq_xid);
1300 }
1301
1302 static int mdt_obj_create(struct ptlrpc_request *req)
1303 {
1304         struct obd_device *obd = req->rq_export->exp_obd;
1305         struct ldlm_res_id res_id = { .name = {0} };
1306         struct mds_obd *mds = &obd->u.mds;
1307         struct ost_body *body, *repbody;
1308         int rc, size = sizeof(*repbody);
1309         char fidname[LL_FID_NAMELEN];
1310         struct inode *parent_inode;
1311         struct lustre_handle lockh;
1312         struct lvfs_run_ctxt saved;
1313         ldlm_policy_data_t policy;
1314         struct dentry *new = NULL;
1315         struct dentry_params dp;
1316         int mealen, flags = 0;
1317         unsigned int tmpname;
1318         struct lvfs_ucred uc;
1319         struct mea *mea;
1320         void *handle;
1321         ENTRY;
1322        
1323         DEBUG_REQ(D_HA, req, "create remote object");
1324
1325         parent_inode = mds->mds_objects_dir->d_inode;
1326
1327         body = lustre_swab_reqbuf(req, 0, sizeof(*body),
1328                                   lustre_swab_ost_body);
1329         if (body == NULL)
1330                 RETURN(-EFAULT);
1331
1332         MDS_CHECK_RESENT(req, reconstruct_create(req));
1333
1334         uc.luc_fsuid = body->oa.o_uid;
1335         uc.luc_fsgid = body->oa.o_gid;
1336
1337         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1338         
1339         rc = lustre_pack_reply(req, 1, &size, NULL);
1340         if (rc)
1341                 RETURN(rc);
1342
1343         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
1344
1345         if (body->oa.o_flags & OBD_FL_RECREATE_OBJS) {
1346                 /* this is re-create request from MDS holding directory name.
1347                  * we have to lookup given ino/generation first. if it exists
1348                  * (good case) then there is nothing to do. if it does not
1349                  * then we have to recreate it */
1350                 struct ll_fid fid;
1351                 fid.id = body->oa.o_id;
1352                 fid.generation = body->oa.o_generation;
1353                 new = mds_fid2dentry(mds, &fid, NULL);
1354                 if (!IS_ERR(new) && new->d_inode) {
1355                         CWARN("mkdir() repairing is on its way: %lu/%lu\n",
1356                               (unsigned long) fid.id,
1357                               (unsigned long) fid.generation);
1358                         obdo_from_inode(&repbody->oa, new->d_inode,
1359                                         FILTER_VALID_FLAGS);
1360                         repbody->oa.o_id = new->d_inode->i_ino;
1361                         repbody->oa.o_generation = new->d_inode->i_generation;
1362                         repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1363                         GOTO(cleanup2, rc = 0);
1364                 }
1365                 CWARN("hmm. for some reason dir %lu/%lu (or reply) got lost\n",
1366                       (unsigned long) fid.id, (unsigned long) fid.generation);
1367                 LASSERT(new->d_inode == NULL ||
1368                         new->d_inode->i_generation != fid.generation);
1369                 l_dput(new); 
1370         }
1371         
1372         down(&parent_inode->i_sem);
1373         handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
1374         LASSERT(!IS_ERR(handle));
1375
1376 repeat:
1377         tmpname = ll_insecure_random_int();
1378         rc = sprintf(fidname, "%u", tmpname);
1379         new = lookup_one_len(fidname, mds->mds_objects_dir, rc);
1380         if (IS_ERR(new)) {
1381                 CERROR("%s: can't lookup new inode (%s) for mkdir: %d\n",
1382                        obd->obd_name, fidname, (int) PTR_ERR(new));
1383                 fsfilt_commit(obd, mds->mds_sb, new->d_inode, handle, 0);
1384                 up(&parent_inode->i_sem);
1385                 RETURN(PTR_ERR(new));
1386         } else if (new->d_inode) {
1387                 CERROR("%s: name exists. repeat\n", obd->obd_name);
1388                 goto repeat;
1389         }
1390
1391         new->d_fsdata = (void *) &dp;
1392         dp.p_inum = 0;
1393         dp.p_ptr = req;
1394
1395         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
1396                 DEBUG_REQ(D_HA, req, "replay create obj %lu/%lu",
1397                           (unsigned long) body->oa.o_id,
1398                           (unsigned long) body->oa.o_generation);
1399                 dp.p_inum = body->oa.o_id;
1400                 dp.p_generation = body->oa.o_generation;
1401         }
1402         rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
1403         if (rc == 0) {
1404                 obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
1405                 repbody->oa.o_id = new->d_inode->i_ino;
1406                 repbody->oa.o_generation = new->d_inode->i_generation;
1407                 repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1408
1409                 rc = fsfilt_del_dir_entry(obd, new);
1410                 up(&parent_inode->i_sem);
1411
1412                 if (rc) {
1413                         CERROR("can't remove name for object: %d\n", rc);
1414                         GOTO(cleanup, rc);
1415                 }
1416                         
1417                 /* this lock should be taken to serialize MDS modifications
1418                  * in failure case */
1419                 res_id.name[0] = new->d_inode->i_ino;
1420                 res_id.name[1] = new->d_inode->i_generation;
1421                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1422                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
1423                                 res_id, LDLM_IBITS, &policy,
1424                                 LCK_EX, &flags, mds_blocking_ast,
1425                                 ldlm_completion_ast, NULL, NULL,
1426                                 NULL, 0, NULL, &lockh);
1427                 if (rc != ELDLM_OK)
1428                         GOTO(cleanup, rc);
1429
1430                 CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
1431                                 (unsigned long) new->d_inode->i_ino,
1432                                 (unsigned long) new->d_inode->i_generation,
1433                                 (unsigned) new->d_inode->i_mode);
1434         } else {
1435                 up(&parent_inode->i_sem);
1436                 CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc);
1437         }
1438
1439         if (rc == 0 && body->oa.o_valid & OBD_MD_FLID) {
1440                 /* this is new object for splitted dir. we have to
1441                  * prevent recursive splitting on it -bzzz */
1442                 mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
1443                 OBD_ALLOC(mea, mealen);
1444                 if (mea == NULL)
1445                         GOTO(cleanup, rc = -ENOMEM);
1446                 mea->mea_count = 0;
1447                 down(&new->d_inode->i_sem);
1448                 rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
1449                 up(&new->d_inode->i_sem);
1450                 OBD_FREE(mea, mealen);
1451         }
1452
1453 cleanup:
1454         rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0);
1455         if (rc == 0)
1456                 ptlrpc_save_lock(req, &lockh, LCK_EX);
1457         else
1458                 ldlm_lock_decref(&lockh, LCK_EX);
1459 cleanup2:
1460         l_dput(new);
1461         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1462         RETURN(rc);
1463 }
1464
1465 static int mdt_get_info(struct ptlrpc_request *req)
1466 {
1467         char *key;
1468         struct obd_export *exp = req->rq_export;
1469         int keylen, rc = 0, size = sizeof(obd_id);
1470         obd_id *reply;
1471         ENTRY;
1472
1473         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
1474         if (key == NULL) {
1475                 DEBUG_REQ(D_HA, req, "no get_info key");
1476                 RETURN(-EFAULT);
1477         }
1478         keylen = req->rq_reqmsg->buflens[0];
1479
1480         if (keylen < strlen("mdsize") || memcmp(key, "mdsize", 6) != 0)
1481                 RETURN(-EPROTO);
1482
1483         rc = lustre_pack_reply(req, 1, &size, NULL);
1484         if (rc)
1485                 RETURN(rc);
1486
1487         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
1488         rc = obd_get_info(exp, keylen, key, &size, reply);
1489         req->rq_repmsg->status = 0;
1490         RETURN(rc);
1491 }
1492
1493 static int mds_set_info(struct obd_export *exp, __u32 keylen,
1494                         void *key, __u32 vallen, void *val)
1495 {
1496         struct obd_device *obd;
1497         struct mds_obd *mds;
1498         int    rc = 0;
1499         ENTRY;
1500
1501         obd = class_exp2obd(exp);
1502         if (obd == NULL) {
1503                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1504                        exp->exp_handle.h_cookie);
1505                 RETURN(-EINVAL);
1506         }
1507
1508 #define KEY_IS(str) \
1509         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
1510
1511         mds = &obd->u.mds;
1512         if (KEY_IS("mds_num")) {
1513                 int valsize;
1514                 __u32 group;
1515                 CDEBUG(D_IOCTL, "set mds num %d\n", *(int*)val);
1516                 mds->mds_num = *(int*)val;
1517                 group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
1518                 valsize = sizeof(group);
1519                 /*mds number has been changed, so the corresponding obdfilter exp
1520                  *need to be changed too*/
1521                 rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
1522                           valsize, &group);
1523                 RETURN(rc);
1524         } else if (KEY_IS("client")) {
1525                 if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)) {
1526                         atomic_inc(&mds->mds_real_clients);
1527                         CDEBUG(D_OTHER,"%s: peer from %s is real client (%d)\n",
1528                                obd->obd_name, exp->exp_client_uuid.uuid,
1529                                atomic_read(&mds->mds_real_clients));
1530                         exp->exp_flags |= OBD_OPT_REAL_CLIENT;
1531                 }
1532                 if (mds->mds_lmv_name) {
1533                         rc = mds_lmv_connect(obd, mds->mds_lmv_name);
1534                         LASSERT(rc == 0);
1535                 }
1536                 RETURN(0);
1537         }
1538         CDEBUG(D_IOCTL, "invalid key\n");
1539         RETURN(-EINVAL);
1540 }
1541
1542 static int mdt_set_info(struct ptlrpc_request *req)
1543 {
1544         char *key, *val;
1545         struct obd_export *exp = req->rq_export;
1546         int keylen, rc = 0, vallen;
1547         ENTRY;
1548
1549         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
1550         if (key == NULL) {
1551                 DEBUG_REQ(D_HA, req, "no set_info key");
1552                 RETURN(-EFAULT);
1553         }
1554         keylen = req->rq_reqmsg->buflens[0];
1555
1556         if (keylen == strlen("mds_num") &&
1557             memcmp(key, "mds_num", keylen) == 0) {
1558                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1559                 if (rc)
1560                         RETURN(rc);
1561                 val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
1562
1563                 vallen = req->rq_reqmsg->buflens[1];
1564
1565                 rc = obd_set_info(exp, keylen, key, vallen, val);
1566                 req->rq_repmsg->status = 0;
1567                 RETURN(rc);
1568         } else if (keylen == strlen("client") &&
1569                    memcmp(key, "client", keylen) == 0) {
1570                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1571                 if (rc)
1572                         RETURN(rc);
1573                 rc = obd_set_info(exp, keylen, key, sizeof(obd_id), NULL);
1574                 req->rq_repmsg->status = 0;
1575                 RETURN(rc);
1576         } 
1577         CDEBUG(D_IOCTL, "invalid key\n");
1578         RETURN(-EINVAL);
1579 }
1580
1581 extern int ost_brw_write(struct ptlrpc_request *, struct obd_trans_info *);
1582 int mds_handle(struct ptlrpc_request *req)
1583 {
1584         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
1585         int rc = 0;
1586         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1587         struct obd_device *obd = NULL;
1588         ENTRY;
1589
1590         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1591
1592         LASSERT(current->journal_info == NULL);
1593         /* XXX identical to OST */
1594         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1595                 struct mds_export_data *med;
1596                 int recovering, abort_recovery;
1597
1598                 if (req->rq_export == NULL) {
1599                         CERROR("lustre_mds: operation %d on unconnected MDS\n",
1600                                req->rq_reqmsg->opc);
1601                         req->rq_status = -ENOTCONN;
1602                         GOTO(out, rc = -ENOTCONN);
1603                 }
1604
1605                 med = &req->rq_export->exp_mds_data;
1606                 obd = req->rq_export->exp_obd;
1607                 mds = &obd->u.mds;
1608
1609                 /* sanity check: if the xid matches, the request must
1610                  * be marked as a resent or replayed */
1611                 if (req->rq_xid == med->med_mcd->mcd_last_xid)
1612                         LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
1613                                  (MSG_RESENT | MSG_REPLAY),
1614                                  "rq_xid "LPU64" matches last_xid, "
1615                                  "expected RESENT flag\n",
1616                                  req->rq_xid);
1617                 /* else: note the opposite is not always true; a
1618                  * RESENT req after a failover will usually not match
1619                  * the last_xid, since it was likely never
1620                  * committed. A REPLAYed request will almost never
1621                  * match the last xid, however it could for a
1622                  * committed, but still retained, open. */
1623
1624                 /* Check for aborted recovery. */
1625                 spin_lock_bh(&obd->obd_processing_task_lock);
1626                 abort_recovery = obd->obd_abort_recovery;
1627                 recovering = obd->obd_recovering;
1628                 spin_unlock_bh(&obd->obd_processing_task_lock);
1629                 if (abort_recovery) {
1630                         target_abort_recovery(obd);
1631                 } else if (recovering) {
1632                         rc = mds_filter_recovery_request(req, obd,
1633                                                          &should_process);
1634                         if (rc || !should_process)
1635                                 RETURN(rc);
1636                 }
1637         }
1638
1639         switch (req->rq_reqmsg->opc) {
1640         case MDS_CONNECT:
1641                 DEBUG_REQ(D_INODE, req, "connect");
1642                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1643                 rc = target_handle_connect(req, mds_handle);
1644                 if (!rc)
1645                         /* Now that we have an export, set mds. */
1646                         mds = mds_req2mds(req);
1647                 break;
1648
1649         case MDS_DISCONNECT:
1650                 DEBUG_REQ(D_INODE, req, "disconnect");
1651                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1652                 rc = target_handle_disconnect(req);
1653                 req->rq_status = rc;            /* superfluous? */
1654                 break;
1655
1656         case MDS_GETSTATUS:
1657                 DEBUG_REQ(D_INODE, req, "getstatus");
1658                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1659                 rc = mds_getstatus(req);
1660                 break;
1661
1662         case MDS_GETATTR:
1663                 DEBUG_REQ(D_INODE, req, "getattr");
1664                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1665                 rc = mds_getattr(0, req);
1666                 break;
1667
1668         case MDS_GETATTR_NAME: {
1669                 struct lustre_handle lockh;
1670                 DEBUG_REQ(D_INODE, req, "getattr_name");
1671                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1672
1673                 /* If this request gets a reconstructed reply, we won't be
1674                  * acquiring any new locks in mds_getattr_name, so we don't
1675                  * want to cancel.
1676                  */
1677                 lockh.cookie = 0;
1678                 rc = mds_getattr_name(0, req, &lockh, MDS_INODELOCK_UPDATE);
1679                 /* this non-intent call (from an ioctl) is special */
1680                 req->rq_status = rc;
1681                 if (rc == 0 && lockh.cookie)
1682                         ldlm_lock_decref(&lockh, LCK_PR);
1683                 break;
1684         }
1685         case MDS_STATFS:
1686                 DEBUG_REQ(D_INODE, req, "statfs");
1687                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1688                 rc = mds_statfs(req);
1689                 break;
1690
1691         case MDS_READPAGE:
1692                 DEBUG_REQ(D_INODE, req, "readpage");
1693                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1694                 rc = mds_readpage(req);
1695
1696                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
1697                         if (req->rq_reply_state) {
1698                                 lustre_free_reply_state (req->rq_reply_state);
1699                                 req->rq_reply_state = NULL;
1700                         }
1701                         RETURN(0);
1702                 }
1703
1704                 break;
1705
1706         case MDS_REINT: {
1707                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*opcp));
1708                 __u32  opc;
1709                 int size[3] = {sizeof(struct mds_body), mds->mds_max_mdsize,
1710                                mds->mds_max_cookiesize};
1711                 int bufcount;
1712
1713                 /* NB only peek inside req now; mds_reint() will swab it */
1714                 if (opcp == NULL) {
1715                         CERROR ("Can't inspect opcode\n");
1716                         rc = -EINVAL;
1717                         break;
1718                 }
1719                 opc = *opcp;
1720                 if (lustre_msg_swabbed (req->rq_reqmsg))
1721                         __swab32s(&opc);
1722
1723                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1724                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1725                            reint_names[opc] == NULL) ? reint_names[opc] :
1726                                                        "unknown opcode");
1727
1728                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1729
1730                 if (opc == REINT_UNLINK)
1731                         bufcount = 3;
1732                 else if (opc == REINT_OPEN || opc == REINT_RENAME)
1733                         bufcount = 2;
1734                 else
1735                         bufcount = 1;
1736
1737                 rc = lustre_pack_reply(req, bufcount, size, NULL);
1738                 if (rc)
1739                         break;
1740
1741                 rc = mds_reint(req, 0, NULL);
1742                 fail = OBD_FAIL_MDS_REINT_NET_REP;
1743                 break;
1744         }
1745
1746         case MDS_CLOSE:
1747                 DEBUG_REQ(D_INODE, req, "close");
1748                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1749                 rc = mds_close(req);
1750                 break;
1751
1752         case MDS_DONE_WRITING:
1753                 DEBUG_REQ(D_INODE, req, "done_writing");
1754                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
1755                 rc = mds_done_writing(req);
1756                 break;
1757
1758         case MDS_PIN:
1759                 DEBUG_REQ(D_INODE, req, "pin");
1760                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1761                 rc = mds_pin(req);
1762                 break;
1763
1764         case MDS_SYNC:
1765                 DEBUG_REQ(D_INODE, req, "sync");
1766                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
1767                 rc = mds_sync(req);
1768                 break;
1769
1770         case OBD_PING:
1771                 DEBUG_REQ(D_INODE, req, "ping");
1772                 rc = target_handle_ping(req);
1773                 break;
1774
1775         case OBD_LOG_CANCEL:
1776                 CDEBUG(D_INODE, "log cancel\n");
1777                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1778                 rc = -ENOTSUPP; /* la la la */
1779                 break;
1780
1781         case LDLM_ENQUEUE:
1782                 DEBUG_REQ(D_INODE, req, "enqueue");
1783                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1784                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1785                                          ldlm_server_blocking_ast, NULL);
1786                 break;
1787         case LDLM_CONVERT:
1788                 DEBUG_REQ(D_INODE, req, "convert");
1789                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1790                 rc = ldlm_handle_convert(req);
1791                 break;
1792         case LDLM_BL_CALLBACK:
1793         case LDLM_CP_CALLBACK:
1794                 DEBUG_REQ(D_INODE, req, "callback");
1795                 CERROR("callbacks should not happen on MDS\n");
1796                 LBUG();
1797                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1798                 break;
1799         case LLOG_ORIGIN_HANDLE_OPEN:
1800                 DEBUG_REQ(D_INODE, req, "llog_init");
1801                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1802                 rc = llog_origin_handle_open(req);
1803                 break;
1804         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1805                 DEBUG_REQ(D_INODE, req, "llog next block");
1806                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1807                 rc = llog_origin_handle_next_block(req);
1808                 break;
1809         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1810                 DEBUG_REQ(D_INODE, req, "llog prev block");
1811                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1812                 rc = llog_origin_handle_prev_block(req);
1813                 break;
1814         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1815                 DEBUG_REQ(D_INODE, req, "llog read header");
1816                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1817                 rc = llog_origin_handle_read_header(req);
1818                 break;
1819         case LLOG_ORIGIN_HANDLE_CLOSE:
1820                 DEBUG_REQ(D_INODE, req, "llog close");
1821                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1822                 rc = llog_origin_handle_close(req);
1823                 break;
1824         case OST_CREATE:
1825                 DEBUG_REQ(D_INODE, req, "ost_create");
1826                 rc = mdt_obj_create(req);
1827                 break;
1828         case OST_GET_INFO:
1829                 DEBUG_REQ(D_INODE, req, "get_info");
1830                 rc = mdt_get_info(req);
1831                 break;
1832         case OST_SET_INFO:
1833                 DEBUG_REQ(D_INODE, req, "set_info");
1834                 rc = mdt_set_info(req);
1835                 break;
1836         case OST_WRITE:
1837                 CDEBUG(D_INODE, "write\n");
1838                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1839                 rc = ost_brw_write(req, NULL);
1840                 LASSERT(current->journal_info == NULL);
1841                 /* mdt_brw sends its own replies */
1842                 RETURN(rc);
1843                 break;
1844         case LLOG_CATINFO:
1845                 DEBUG_REQ(D_INODE, req, "llog catinfo");
1846                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1847                 rc = llog_catinfo(req);
1848                 break;
1849         default:
1850                 req->rq_status = -ENOTSUPP;
1851                 rc = ptlrpc_error(req);
1852                 RETURN(rc);
1853         }
1854
1855         LASSERT(current->journal_info == NULL);
1856
1857         EXIT;
1858
1859         /* If we're DISCONNECTing, the mds_export_data is already freed */
1860         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1861                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1862                 struct obd_device *obd = list_entry(mds, struct obd_device,
1863                                                     u.mds);
1864                 req->rq_repmsg->last_xid =
1865                         le64_to_cpu(med->med_mcd->mcd_last_xid);
1866
1867                 if (!obd->obd_no_transno) {
1868                         req->rq_repmsg->last_committed =
1869                                 obd->obd_last_committed;
1870                 } else {
1871                         DEBUG_REQ(D_IOCTL, req,
1872                                   "not sending last_committed update");
1873                 }
1874                 CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
1875                        ", xid "LPU64"\n",
1876                        mds->mds_last_transno, obd->obd_last_committed,
1877                        req->rq_xid);
1878         }
1879  out:
1880
1881         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1882                 if (obd && obd->obd_recovering) {
1883                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1884                         return target_queue_final_reply(req, rc);
1885                 }
1886                 /* Lost a race with recovery; let the error path DTRT. */
1887                 rc = req->rq_status = -ENOTCONN;
1888         }
1889
1890         target_send_reply(req, rc, fail);
1891         return 0;
1892 }
1893
1894 /* Update the server data on disk.  This stores the new mount_count and
1895  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1896  * then the server last_rcvd value may be less than that of the clients.
1897  * This will alert us that we may need to do client recovery.
1898  *
1899  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1900  */
1901 int mds_update_server_data(struct obd_device *obd, int force_sync)
1902 {
1903         struct mds_obd *mds = &obd->u.mds;
1904         struct mds_server_data *msd = mds->mds_server_data;
1905         struct file *filp = mds->mds_rcvd_filp;
1906         struct lvfs_run_ctxt saved;
1907         loff_t off = 0;
1908         int rc;
1909         ENTRY;
1910
1911         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1912         msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno);
1913
1914         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1915                mds->mds_mount_count, mds->mds_last_transno);
1916         rc = fsfilt_write_record(obd, filp, msd, sizeof(*msd), &off,force_sync);
1917         if (rc)
1918                 CERROR("error writing MDS server data: rc = %d\n", rc);
1919         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1920
1921         RETURN(rc);
1922 }
1923
1924 /* mount the file system (secretly) */
1925 static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
1926 {
1927         struct lustre_cfg* lcfg = buf;
1928         struct mds_obd *mds = &obd->u.mds;
1929         char *options = NULL;
1930         struct vfsmount *mnt;
1931         unsigned long page;
1932         int rc = 0;
1933         ENTRY;
1934
1935         dev_clear_rdonly(2);
1936
1937         if (!lcfg->lcfg_inlbuf1 || !lcfg->lcfg_inlbuf2)
1938                 RETURN(rc = -EINVAL);
1939
1940         obd->obd_fsops = fsfilt_get_ops(lcfg->lcfg_inlbuf2);
1941         if (IS_ERR(obd->obd_fsops))
1942                 RETURN(rc = PTR_ERR(obd->obd_fsops));
1943
1944         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1945
1946         page = __get_free_page(GFP_KERNEL);
1947         if (!page)
1948                 RETURN(-ENOMEM);
1949
1950         options = (char *)page;
1951         memset(options, 0, PAGE_SIZE);
1952
1953         /* here we use "iopen_nopriv" hardcoded, because it affects MDS utility
1954          * and the rest of options are passed by mount options. Probably this
1955          * should be moved to somewhere else like startup scripts or lconf. */
1956         sprintf(options, "iopen_nopriv");
1957
1958         if (lcfg->lcfg_inllen4 > 0 && lcfg->lcfg_inlbuf4)
1959                 sprintf(options + strlen(options), ",%s",
1960                         lcfg->lcfg_inlbuf4);
1961
1962         /* we have to know mdsnum before touching underlying fs -bzzz */
1963         if (lcfg->lcfg_inllen5 > 0 && lcfg->lcfg_inlbuf5 && 
1964             strcmp(lcfg->lcfg_inlbuf5, "dumb")) {
1965                 class_uuid_t uuid;
1966
1967                 CDEBUG(D_OTHER, "MDS: %s is master for %s\n",
1968                        obd->obd_name, lcfg->lcfg_inlbuf5);
1969
1970                 generate_random_uuid(uuid);
1971                 class_uuid_unparse(uuid, &mds->mds_lmv_uuid);
1972
1973                 OBD_ALLOC(mds->mds_lmv_name, lcfg->lcfg_inllen5);
1974                 if (mds->mds_lmv_name == NULL) 
1975                         RETURN(rc = -ENOMEM);
1976
1977                 memcpy(mds->mds_lmv_name, lcfg->lcfg_inlbuf5,
1978                        lcfg->lcfg_inllen5);
1979                 
1980                 rc = mds_lmv_connect(obd, mds->mds_lmv_name);
1981                 if (rc) {
1982                         OBD_FREE(mds->mds_lmv_name, lcfg->lcfg_inllen5);
1983                         GOTO(err_ops, rc);
1984                 }
1985         }
1986         
1987         /* FIXME-WANGDI: this should be reworked when we will use lmv along 
1988          * with cobd, because correct mdsnum is set in mds_lmv_connect(). */
1989         if (lcfg->lcfg_inllen6 > 0 && lcfg->lcfg_inlbuf6 && !mds->mds_lmv_obd &&
1990             strcmp(lcfg->lcfg_inlbuf6, "dumb")) {
1991                 if (!memcmp(lcfg->lcfg_inlbuf6, "master", strlen("master")) &&
1992                     mds->mds_num == 0) {
1993                         mds->mds_num = REAL_MDS_NUMBER;
1994                 } else if (!memcmp(lcfg->lcfg_inlbuf6, "cache", strlen("cache")) &&
1995                            mds->mds_num == 0) {
1996                         mds->mds_num = CACHE_MDS_NUMBER;
1997                 }     
1998         }
1999
2000         mnt = do_kern_mount(lcfg->lcfg_inlbuf2, 0, 
2001                             lcfg->lcfg_inlbuf1, options);
2002
2003         free_page(page);
2004
2005         if (IS_ERR(mnt)) {
2006                 rc = PTR_ERR(mnt);
2007                 CERROR("do_kern_mount failed: rc = %d\n", rc);
2008                 GOTO(err_ops, rc);
2009         }
2010
2011         CDEBUG(D_SUPER, "%s: mnt = %p\n", lcfg->lcfg_inlbuf1, mnt);
2012
2013         sema_init(&mds->mds_orphan_recovery_sem, 1);
2014         sema_init(&mds->mds_epoch_sem, 1);
2015         spin_lock_init(&mds->mds_transno_lock);
2016         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
2017         atomic_set(&mds->mds_open_count, 0);
2018         atomic_set(&mds->mds_real_clients, 0);
2019
2020         obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
2021                                                 LDLM_NAMESPACE_SERVER);
2022         if (obd->obd_namespace == NULL) {
2023                 mds_cleanup(obd, 0);
2024                 GOTO(err_put, rc = -ENOMEM);
2025         }
2026         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
2027
2028         rc = mds_fs_setup(obd, mnt);
2029         if (rc) {
2030                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
2031                 GOTO(err_ns, rc);
2032         }
2033
2034         rc = llog_start_commit_thread();
2035         if (rc < 0)
2036                 GOTO(err_fs, rc);
2037
2038         if (lcfg->lcfg_inllen3 > 0 && lcfg->lcfg_inlbuf3) {
2039                 class_uuid_t uuid;
2040
2041                 generate_random_uuid(uuid);
2042                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
2043
2044                 OBD_ALLOC(mds->mds_profile, lcfg->lcfg_inllen3);
2045                 if (mds->mds_profile == NULL)
2046                         GOTO(err_fs, rc = -ENOMEM);
2047
2048                 memcpy(mds->mds_profile, lcfg->lcfg_inlbuf3,
2049                        lcfg->lcfg_inllen3);
2050         }
2051
2052         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2053                            "mds_ldlm_client", &obd->obd_ldlm_client);
2054         obd->obd_replayable = 1;
2055
2056         rc = mds_postsetup(obd);
2057         if (rc)
2058                 GOTO(err_fs, rc);
2059
2060         RETURN(0);
2061
2062 err_fs:
2063         /* No extra cleanup needed for llog_init_commit_thread() */
2064         mds_fs_cleanup(obd, 0);
2065 err_ns:
2066         ldlm_namespace_free(obd->obd_namespace, 0);
2067         obd->obd_namespace = NULL;
2068 err_put:
2069         unlock_kernel();
2070         mntput(mds->mds_vfsmnt);
2071         mds->mds_sb = 0;
2072         lock_kernel();
2073 err_ops:
2074         fsfilt_put_ops(obd->obd_fsops);
2075         return rc;
2076 }
2077
2078 static int mds_postsetup(struct obd_device *obd)
2079 {
2080         struct mds_obd *mds = &obd->u.mds;
2081         int rc = 0;
2082         ENTRY;
2083
2084         rc = obd_llog_setup(obd, &obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT, 
2085                             obd, 0, NULL, &llog_lvfs_ops);
2086         if (rc)
2087                 RETURN(rc);
2088
2089         /* This check for @dumb string is needed to handle mounting MDS 
2090            with smfs. Read lconf:MDSDEV.write_conf() for more detail 
2091            explanation. */
2092         if (mds->mds_profile && strcmp(mds->mds_profile, "dumb")) {
2093                 struct lvfs_run_ctxt saved;
2094                 struct lustre_profile *lprof;
2095                 struct config_llog_instance cfg;
2096
2097                 cfg.cfg_instance = NULL;
2098                 cfg.cfg_uuid = mds->mds_lov_uuid;
2099                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2100                 rc = class_config_process_llog(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT),
2101                                              mds->mds_profile, &cfg);
2102                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2103                 if (rc)
2104                         GOTO(err_llog, rc);
2105
2106                 lprof = class_get_profile(mds->mds_profile);
2107                 if (lprof == NULL) {
2108                         CERROR("No profile found: %s\n", mds->mds_profile);
2109                         GOTO(err_cleanup, rc = -ENOENT);
2110                 }
2111                 rc = mds_lov_connect(obd, lprof->lp_osc);
2112                 if (rc)
2113                         GOTO(err_cleanup, rc);
2114
2115                 rc = mds_lmv_postsetup(obd);
2116                 if (rc)
2117                         GOTO(err_cleanup, rc);
2118         }
2119
2120         RETURN(rc);
2121
2122 err_cleanup:
2123         mds_lov_clean(obd);
2124 err_llog:
2125         obd_llog_cleanup(llog_get_context(&obd->obd_llogs,
2126                                           LLOG_CONFIG_ORIG_CTXT));
2127         RETURN(rc);
2128 }
2129
2130 int mds_postrecov(struct obd_device *obd)
2131 {
2132         struct mds_obd *mds = &obd->u.mds;
2133         struct llog_ctxt *ctxt;
2134         int rc, item = 0;
2135         ENTRY;
2136
2137         LASSERT(!obd->obd_recovering);
2138         ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
2139         LASSERT(ctxt != NULL);
2140
2141         /* set nextid first, so we are sure it happens */
2142         rc = mds_lov_set_nextid(obd);
2143         if (rc) {
2144                 CERROR("%s: mds_lov_set_nextid failed\n", obd->obd_name);
2145                 GOTO(out, rc);
2146         }
2147
2148         /* clean PENDING dir */
2149         rc = mds_cleanup_orphans(obd);
2150         if (rc < 0)
2151                 GOTO(out, rc);
2152         item = rc;
2153
2154         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
2155                           NULL, NULL, NULL);
2156         if (rc) {
2157                 CERROR("%s: failed at llog_origin_connect: %d\n", 
2158                        obd->obd_name, rc);
2159                 GOTO(out, rc);
2160         }
2161
2162         /* remove the orphaned precreated objects */
2163         rc = mds_lov_clearorphans(mds, NULL /* all OSTs */);
2164         if (rc)
2165                 GOTO(err_llog, rc);
2166
2167 out:
2168         RETURN(rc < 0 ? rc : item);
2169
2170 err_llog:
2171         /* cleanup all llogging subsystems */
2172         rc = obd_llog_finish(obd, &obd->obd_llogs,
2173                              mds->mds_lov_desc.ld_tgt_count);
2174         if (rc)
2175                 CERROR("%s: failed to cleanup llogging subsystems\n",
2176                         obd->obd_name);
2177         goto out;
2178 }
2179
2180 int mds_lov_clean(struct obd_device *obd)
2181 {
2182         struct mds_obd *mds = &obd->u.mds;
2183
2184         if (mds->mds_profile) {
2185                 char * cln_prof;
2186                 struct config_llog_instance cfg;
2187                 struct lvfs_run_ctxt saved;
2188                 int len = strlen(mds->mds_profile) + sizeof("-clean") + 1;
2189
2190                 OBD_ALLOC(cln_prof, len);
2191                 sprintf(cln_prof, "%s-clean", mds->mds_profile);
2192
2193                 cfg.cfg_instance = NULL;
2194                 cfg.cfg_uuid = mds->mds_lov_uuid;
2195
2196                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2197                 class_config_process_llog(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT),
2198                                           cln_prof, &cfg);
2199                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2200
2201                 OBD_FREE(cln_prof, len);
2202                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
2203                 mds->mds_profile = NULL;
2204         }
2205         RETURN(0);
2206 }
2207
2208 int mds_lmv_clean(struct obd_device *obd)
2209 {
2210         struct mds_obd *mds = &obd->u.mds;
2211
2212         if (mds->mds_lmv_name) {
2213                 OBD_FREE(mds->mds_lmv_name, strlen(mds->mds_lmv_name) + 1);
2214                 mds->mds_lmv_name = NULL;
2215         }
2216         RETURN(0);
2217 }
2218
2219 static int mds_precleanup(struct obd_device *obd, int flags)
2220 {
2221         int rc = 0;
2222         ENTRY;
2223
2224         mds_lmv_clean(obd);
2225         mds_lov_disconnect(obd, flags);
2226         mds_lov_clean(obd);
2227         obd_llog_cleanup(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT));
2228         RETURN(rc);
2229 }
2230
2231 static int mds_cleanup(struct obd_device *obd, int flags)
2232 {
2233         struct mds_obd *mds = &obd->u.mds;
2234         ENTRY;
2235
2236         if (mds->mds_sb == NULL)
2237                 RETURN(0);
2238
2239         mds_update_server_data(obd, 1);
2240         if (mds->mds_lov_objids != NULL) {
2241                 OBD_FREE(mds->mds_lov_objids,
2242                          mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id));
2243         }
2244         mds_fs_cleanup(obd, flags);
2245
2246         unlock_kernel();
2247
2248         /* 2 seems normal on mds, (may_umount() also expects 2
2249           fwiw), but we only see 1 at this point in obdfilter. */
2250         if (atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count) > 2)
2251                 CERROR("%s: mount busy, mnt_count %d != 2\n", obd->obd_name,
2252                        atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count));
2253
2254         mntput(mds->mds_vfsmnt);
2255
2256         mds->mds_sb = 0;
2257
2258         ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
2259
2260         spin_lock_bh(&obd->obd_processing_task_lock);
2261         if (obd->obd_recovering) {
2262                 target_cancel_recovery_timer(obd);
2263                 obd->obd_recovering = 0;
2264         }
2265         spin_unlock_bh(&obd->obd_processing_task_lock);
2266
2267         lock_kernel();
2268         dev_clear_rdonly(2);
2269         fsfilt_put_ops(obd->obd_fsops);
2270
2271         RETURN(0);
2272 }
2273
2274 static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
2275                                         struct ldlm_lock *new_lock,
2276                                         struct lustre_handle *lockh)
2277 {
2278         struct obd_export *exp = req->rq_export;
2279         struct obd_device *obd = exp->exp_obd;
2280         struct ldlm_request *dlmreq =
2281                 lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
2282         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
2283         struct list_head *iter;
2284
2285         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2286                 return;
2287
2288         l_lock(&obd->obd_namespace->ns_lock);
2289         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2290                 struct ldlm_lock *lock;
2291                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2292                 if (lock == new_lock)
2293                         continue;
2294                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2295                         lockh->cookie = lock->l_handle.h_cookie;
2296                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2297                                   lockh->cookie);
2298                         l_unlock(&obd->obd_namespace->ns_lock);
2299                         return;
2300                 }
2301         }
2302         l_unlock(&obd->obd_namespace->ns_lock);
2303
2304         /* If the xid matches, then we know this is a resent request,
2305          * and allow it. (It's probably an OPEN, for which we don't
2306          * send a lock */
2307         if (req->rq_xid == exp->exp_mds_data.med_mcd->mcd_last_xid)
2308                 return;
2309
2310         /* This remote handle isn't enqueued, so we never received or
2311          * processed this request.  Clear MSG_RESENT, because it can
2312          * be handled like any normal request now. */
2313
2314         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2315
2316         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2317                   remote_hdl.cookie);
2318 }
2319
2320 int intent_disposition(struct ldlm_reply *rep, int flag)
2321 {
2322         if (!rep)
2323                 return 0;
2324         return (rep->lock_policy_res1 & flag);
2325 }
2326
2327 void intent_set_disposition(struct ldlm_reply *rep, int flag)
2328 {
2329         if (!rep)
2330                 return;
2331         rep->lock_policy_res1 |= flag;
2332 }
2333
2334 static int mds_intent_policy(struct ldlm_namespace *ns,
2335                              struct ldlm_lock **lockp, void *req_cookie,
2336                              ldlm_mode_t mode, int flags, void *data)
2337 {
2338         struct ptlrpc_request *req = req_cookie;
2339         struct ldlm_lock *lock = *lockp;
2340         struct ldlm_intent *it;
2341         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
2342         struct ldlm_reply *rep;
2343         struct lustre_handle lockh = { 0 };
2344         struct ldlm_lock *new_lock;
2345         int getattr_part = MDS_INODELOCK_UPDATE;
2346         int rc, offset = 2, repsize[4] = {sizeof(struct ldlm_reply),
2347                                           sizeof(struct mds_body),
2348                                           mds->mds_max_mdsize,
2349                                           mds->mds_max_cookiesize};
2350         ENTRY;
2351
2352         LASSERT(req != NULL);
2353
2354         if (req->rq_reqmsg->bufcount <= 1) {
2355                 /* No intent was provided */
2356                 int size = sizeof(struct ldlm_reply);
2357                 rc = lustre_pack_reply(req, 1, &size, NULL);
2358                 LASSERT(rc == 0);
2359                 RETURN(0);
2360         }
2361
2362         it = lustre_swab_reqbuf(req, 1, sizeof(*it), lustre_swab_ldlm_intent);
2363         if (it == NULL) {
2364                 CERROR("Intent missing\n");
2365                 RETURN(req->rq_status = -EFAULT);
2366         }
2367
2368         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
2369
2370         rc = lustre_pack_reply(req, 3, repsize, NULL);
2371         if (rc)
2372                 RETURN(req->rq_status = rc);
2373
2374         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
2375         intent_set_disposition(rep, DISP_IT_EXECD);
2376
2377         fixup_handle_for_resent_req(req, lock, &lockh);
2378
2379         /* execute policy */
2380         switch ((long)it->opc) {
2381         case IT_OPEN:
2382         case IT_CREAT|IT_OPEN:
2383                 /* XXX swab here to assert that an mds_open reint
2384                  * packet is following */
2385                 rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
2386 #if 0
2387                 /* We abort the lock if the lookup was negative and
2388                  * we did not make it to the OPEN portion */
2389                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
2390                         RETURN(ELDLM_LOCK_ABORTED);
2391                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
2392                     !intent_disposition(rep, DISP_OPEN_OPEN))
2393 #endif
2394                 /* IT_OPEN may return lock on cross-node dentry
2395                  * that we want to hold during attr retrival -bzzz */
2396                 if (rc != 0 || lockh.cookie == 0)
2397                         RETURN(ELDLM_LOCK_ABORTED);
2398                 break;
2399         case IT_LOOKUP:
2400                 getattr_part = MDS_INODELOCK_LOOKUP;
2401         case IT_CHDIR:
2402         case IT_GETATTR:
2403                 getattr_part |= MDS_INODELOCK_LOOKUP;
2404         case IT_READDIR:
2405                 rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh,
2406                                                          getattr_part);
2407                 /* FIXME: LDLM can set req->rq_status. MDS sets
2408                    policy_res{1,2} with disposition and status.
2409                    - replay: returns 0 & req->status is old status
2410                    - otherwise: returns req->status */
2411                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
2412                         rep->lock_policy_res2 = 0;
2413                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
2414                     rep->lock_policy_res2)
2415                         RETURN(ELDLM_LOCK_ABORTED);
2416                 if (req->rq_status != 0) {
2417                         LBUG();
2418                         rep->lock_policy_res2 = req->rq_status;
2419                         RETURN(ELDLM_LOCK_ABORTED);
2420                 }
2421                 break;
2422         case IT_UNLINK:
2423                 rc = mds_lock_and_check_slave(offset, req, &lockh);
2424                 if ((rep->lock_policy_res2 = rc)) {
2425                         if (rc == ENOLCK)
2426                                 rep->lock_policy_res2 = 0;
2427                         RETURN(ELDLM_LOCK_ABORTED);
2428                 }
2429                 break;
2430         default:
2431                 CERROR("Unhandled intent "LPD64"\n", it->opc);
2432                 LBUG();
2433         }
2434
2435         /* By this point, whatever function we called above must have either
2436          * filled in 'lockh', been an intent replay, or returned an error.  We
2437          * want to allow replayed RPCs to not get a lock, since we would just
2438          * drop it below anyways because lock replay is done separately by the
2439          * client afterwards.  For regular RPCs we want to give the new lock to
2440          * the client instead of whatever lock it was about to get. */
2441         new_lock = ldlm_handle2lock(&lockh);
2442         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2443                 RETURN(0);
2444
2445         LASSERT(new_lock != NULL);
2446
2447         /* If we've already given this lock to a client once, then we should
2448          * have no readers or writers.  Otherwise, we should have one reader
2449          * _or_ writer ref (which will be zeroed below) before returning the
2450          * lock to a client. */
2451         if (new_lock->l_export == req->rq_export) {
2452                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2453         } else {
2454                 LASSERT(new_lock->l_export == NULL);
2455                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2456         }
2457
2458         *lockp = new_lock;
2459
2460         if (new_lock->l_export == req->rq_export) {
2461                 /* Already gave this to the client, which means that we
2462                  * reconstructed a reply. */
2463                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2464                         MSG_RESENT);
2465                 RETURN(ELDLM_LOCK_REPLACED);
2466         }
2467
2468         /* Fixup the lock to be given to the client */
2469         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
2470         new_lock->l_readers = 0;
2471         new_lock->l_writers = 0;
2472
2473         new_lock->l_export = class_export_get(req->rq_export);
2474         list_add(&new_lock->l_export_chain,
2475                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2476
2477         new_lock->l_blocking_ast = lock->l_blocking_ast;
2478         new_lock->l_completion_ast = lock->l_completion_ast;
2479
2480         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
2481                sizeof(lock->l_remote_handle));
2482
2483         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2484
2485         LDLM_LOCK_PUT(new_lock);
2486         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
2487
2488         RETURN(ELDLM_LOCK_REPLACED);
2489 }
2490
2491 int mds_attach(struct obd_device *dev, obd_count len, void *data)
2492 {
2493         struct lprocfs_static_vars lvars;
2494
2495         lprocfs_init_multi_vars(0, &lvars);
2496         return lprocfs_obd_attach(dev, lvars.obd_vars);
2497 }
2498
2499 int mds_detach(struct obd_device *dev)
2500 {
2501         return lprocfs_obd_detach(dev);
2502 }
2503
2504 int mdt_attach(struct obd_device *dev, obd_count len, void *data)
2505 {
2506         struct lprocfs_static_vars lvars;
2507
2508         lprocfs_init_multi_vars(1, &lvars);
2509         return lprocfs_obd_attach(dev, lvars.obd_vars);
2510 }
2511
2512 int mdt_detach(struct obd_device *dev)
2513 {
2514         return lprocfs_obd_detach(dev);
2515 }
2516
2517 static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
2518 {
2519         struct mds_obd *mds = &obd->u.mds;
2520         int rc = 0;
2521         ENTRY;
2522
2523         mds->mds_service =
2524                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2525                                 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
2526                                 mds_handle, "mds", obd->obd_proc_entry);
2527
2528         if (!mds->mds_service) {
2529                 CERROR("failed to start service\n");
2530                 RETURN(-ENOMEM);
2531         }
2532
2533         rc = ptlrpc_start_n_threads(obd, mds->mds_service, MDT_NUM_THREADS,
2534                                     "ll_mdt");
2535         if (rc)
2536                 GOTO(err_thread, rc);
2537
2538         mds->mds_setattr_service =
2539                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2540                                 MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
2541                                 mds_handle, "mds_setattr",
2542                                 obd->obd_proc_entry);
2543         if (!mds->mds_setattr_service) {
2544                 CERROR("failed to start getattr service\n");
2545                 GOTO(err_thread, rc = -ENOMEM);
2546         }
2547
2548         rc = ptlrpc_start_n_threads(obd, mds->mds_setattr_service,
2549                                     MDT_NUM_THREADS, "ll_mdt_attr");
2550         if (rc)
2551                 GOTO(err_thread2, rc);
2552
2553         mds->mds_readpage_service =
2554                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2555                                 MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
2556                                 mds_handle, "mds_readpage",
2557                                 obd->obd_proc_entry);
2558         if (!mds->mds_readpage_service) {
2559                 CERROR("failed to start readpage service\n");
2560                 GOTO(err_thread2, rc = -ENOMEM);
2561         }
2562
2563         rc = ptlrpc_start_n_threads(obd, mds->mds_readpage_service,
2564                                     MDT_NUM_THREADS, "ll_mdt_rdpg");
2565
2566         if (rc)
2567                 GOTO(err_thread3, rc);
2568
2569         RETURN(0);
2570
2571 err_thread3:
2572         ptlrpc_unregister_service(mds->mds_readpage_service);
2573 err_thread2:
2574         ptlrpc_unregister_service(mds->mds_setattr_service);
2575 err_thread:
2576         ptlrpc_unregister_service(mds->mds_service);
2577         return rc;
2578 }
2579
2580 static int mdt_cleanup(struct obd_device *obd, int flags)
2581 {
2582         struct mds_obd *mds = &obd->u.mds;
2583         ENTRY;
2584
2585         ptlrpc_stop_all_threads(mds->mds_readpage_service);
2586         ptlrpc_unregister_service(mds->mds_readpage_service);
2587
2588         ptlrpc_stop_all_threads(mds->mds_setattr_service);
2589         ptlrpc_unregister_service(mds->mds_setattr_service);
2590
2591         ptlrpc_stop_all_threads(mds->mds_service);
2592         ptlrpc_unregister_service(mds->mds_service);
2593
2594         RETURN(0);
2595 }
2596
2597 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2598                                           void *data)
2599 {
2600         struct obd_device *obd = data;
2601         struct ll_fid fid;
2602         fid.id = id;
2603         fid.generation = gen;
2604         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
2605 }
2606
2607 static int mds_get_info(struct obd_export *exp, __u32 keylen,
2608                         void *key, __u32 *vallen, void *val)
2609 {
2610         struct obd_device *obd;
2611         struct mds_obd *mds;
2612         ENTRY;
2613
2614         obd = class_exp2obd(exp);
2615         if (obd == NULL) {
2616                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2617                        exp->exp_handle.h_cookie);
2618                 RETURN(-EINVAL);
2619         }
2620
2621         if (keylen >= strlen("reint_log") && memcmp(key, "reint_log", 9) == 0) {
2622                 /*Get log_context handle*/
2623                 unsigned long *llh_handle = val;
2624                 *vallen = sizeof(unsigned long);
2625                 *llh_handle = (unsigned long)obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT];
2626                 RETURN(0);
2627         }
2628         if (keylen >= strlen("cache_sb") && memcmp(key, "cache_sb", 8) == 0) {
2629                 /*Get log_context handle*/
2630                 unsigned long *sb = val;
2631                 *vallen = sizeof(unsigned long);
2632                 *sb = (unsigned long)obd->u.mds.mds_sb;
2633                 RETURN(0);
2634         }
2635
2636         mds = &obd->u.mds;
2637         keylen == strlen("mdsize");
2638         if (keylen && memcmp(key, "mdsize", keylen) == 0) {
2639                 __u32 *mdsize = val;
2640                 *vallen = sizeof(*mdsize);
2641                 *mdsize = mds->mds_max_mdsize;
2642                 RETURN(0);
2643         }
2644
2645         CDEBUG(D_IOCTL, "invalid key\n");
2646         RETURN(-EINVAL);
2647
2648 }
2649 struct lvfs_callback_ops mds_lvfs_ops = {
2650         l_fid2dentry:     mds_lvfs_fid2dentry,
2651 };
2652
2653 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
2654                 int objcount, struct obd_ioobj *obj,
2655                 int niocount, struct niobuf_remote *nb,
2656                 struct niobuf_local *res,
2657                 struct obd_trans_info *oti);
2658 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
2659                  int objcount, struct obd_ioobj *obj, int niocount,
2660                  struct niobuf_local *res, struct obd_trans_info *oti,
2661                  int rc);
2662
2663 /* use obd ops to offer management infrastructure */
2664 static struct obd_ops mds_obd_ops = {
2665         .o_owner           = THIS_MODULE,
2666         .o_attach          = mds_attach,
2667         .o_detach          = mds_detach,
2668         .o_connect         = mds_connect,
2669         .o_init_export     = mds_init_export,
2670         .o_destroy_export  = mds_destroy_export,
2671         .o_disconnect      = mds_disconnect,
2672         .o_setup           = mds_setup,
2673         .o_precleanup      = mds_precleanup,
2674         .o_cleanup         = mds_cleanup,
2675         .o_postrecov       = mds_postrecov,
2676         .o_statfs          = mds_obd_statfs,
2677         .o_iocontrol       = mds_iocontrol,
2678         .o_create          = mds_obd_create,
2679         .o_destroy         = mds_obd_destroy,
2680         .o_llog_init       = mds_llog_init,
2681         .o_llog_finish     = mds_llog_finish,
2682         .o_notify          = mds_notify,
2683         .o_get_info        = mds_get_info,
2684         .o_set_info        = mds_set_info,
2685         .o_preprw          = mds_preprw, 
2686         .o_commitrw        = mds_commitrw,
2687 };
2688
2689 static struct obd_ops mdt_obd_ops = {
2690         .o_owner           = THIS_MODULE,
2691         .o_attach          = mdt_attach,
2692         .o_detach          = mdt_detach,
2693         .o_setup           = mdt_setup,
2694         .o_cleanup         = mdt_cleanup,
2695         .o_attach          = mdt_attach,
2696         .o_detach          = mdt_detach,
2697 };
2698
2699 static int __init mds_init(void)
2700 {
2701         struct lprocfs_static_vars lvars;
2702
2703         lprocfs_init_multi_vars(0, &lvars);
2704         class_register_type(&mds_obd_ops, NULL, lvars.module_vars,
2705                             LUSTRE_MDS_NAME);
2706         lprocfs_init_multi_vars(1, &lvars);
2707         class_register_type(&mdt_obd_ops, NULL, lvars.module_vars,
2708                             LUSTRE_MDT_NAME);
2709
2710         return 0;
2711 }
2712
2713 static void /*__exit*/ mds_exit(void)
2714 {
2715         class_unregister_type(LUSTRE_MDS_NAME);
2716         class_unregister_type(LUSTRE_MDT_NAME);
2717 }
2718
2719 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2720 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2721 MODULE_LICENSE("GPL");
2722
2723 module_init(mds_init);
2724 module_exit(mds_exit);