Whamcloud - gitweb
- CROW (CReate On Write) (precreation is removed)
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <linux/lustre_mds.h>
36 #include <linux/lustre_dlm.h>
37 #include <linux/init.h>
38 #include <linux/obd_class.h>
39 #include <linux/random.h>
40 #include <linux/fs.h>
41 #include <linux/jbd.h>
42 #include <linux/namei.h>
43 #include <linux/ext3_fs.h>
44 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
45 # include <linux/smp_lock.h>
46 # include <linux/buffer_head.h>
47 # include <linux/workqueue.h>
48 # include <linux/mount.h>
49 #else
50 # include <linux/locks.h>
51 #endif
52 #include <linux/obd_lov.h>
53 #include <linux/obd_ost.h>
54 #include <linux/lustre_mds.h>
55 #include <linux/lustre_fsfilt.h>
56 #include <linux/lprocfs_status.h>
57 #include <linux/lustre_commit_confd.h>
58 #include <linux/lustre_acl.h>
59 #include "mds_internal.h"
60 #include <linux/lustre_sec.h>
61
62 static int mds_intent_policy(struct ldlm_namespace *ns,
63                              struct ldlm_lock **lockp, void *req_cookie,
64                              ldlm_mode_t mode, int flags, void *data);
65 static int mds_postsetup(struct obd_device *obd);
66 static int mds_cleanup(struct obd_device *obd, int flags);
67
68
69 /* Assumes caller has already pushed into the kernel filesystem context */
70 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
71                         loff_t offset, int count)
72 {
73         struct ptlrpc_bulk_desc *desc;
74         struct l_wait_info lwi;
75         struct page **pages;
76         int rc = 0, npages, i, tmpcount, tmpsize = 0;
77         ENTRY;
78
79         LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */
80
81         npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT;
82         OBD_ALLOC(pages, sizeof(*pages) * npages);
83         if (!pages)
84                 GOTO(out, rc = -ENOMEM);
85
86         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
87                                     MDS_BULK_PORTAL);
88         if (desc == NULL)
89                 GOTO(out_free, rc = -ENOMEM);
90
91         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
92                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
93
94                 pages[i] = alloc_pages(GFP_KERNEL, 0);
95                 if (pages[i] == NULL)
96                         GOTO(cleanup_buf, rc = -ENOMEM);
97
98                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
99         }
100
101         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
102                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
103                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
104                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
105                        file->f_dentry->d_inode->i_size);
106
107                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
108                                      kmap(pages[i]), tmpsize, &offset);
109                 kunmap(pages[i]);
110
111                 if (rc != tmpsize)
112                         GOTO(cleanup_buf, rc = -EIO);
113         }
114
115         LASSERT(desc->bd_nob == count);
116
117         rc = ptlrpc_start_bulk_transfer(desc);
118         if (rc)
119                 GOTO(cleanup_buf, rc);
120
121         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
122                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
123                        OBD_FAIL_MDS_SENDPAGE, rc = -EIO);
124                 GOTO(abort_bulk, rc);
125         }
126
127         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
128         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
129         LASSERT (rc == 0 || rc == -ETIMEDOUT);
130
131         if (rc == 0) {
132                 if (desc->bd_success &&
133                     desc->bd_nob_transferred == count)
134                         GOTO(cleanup_buf, rc);
135
136                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
137         }
138
139         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
140                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
141                   desc->bd_nob_transferred, count,
142                   req->rq_export->exp_client_uuid.uuid,
143                   req->rq_export->exp_connection->c_remote_uuid.uuid);
144
145         ptlrpc_fail_export(req->rq_export);
146
147         EXIT;
148  abort_bulk:
149         ptlrpc_abort_bulk (desc);
150  cleanup_buf:
151         for (i = 0; i < npages; i++)
152                 if (pages[i])
153                         __free_pages(pages[i], 0);
154
155         ptlrpc_free_bulk(desc);
156  out_free:
157         OBD_FREE(pages, sizeof(*pages) * npages);
158  out:
159         return rc;
160 }
161
162 extern char *ldlm_lockname[];
163
164 int mds_lock_mode_for_dir(struct obd_device *obd,
165                           struct dentry *dentry, int mode)
166 {
167         int ret_mode = 0, split;
168
169         /* any dir access needs couple locks:
170          * 1) on part of dir we gonna lookup/modify in
171          * 2) on a whole dir to protect it from concurrent splitting
172          *    and to flush client's cache for readdir()
173          * so, for a given mode and dentry this routine decides what
174          * lock mode to use for lock #2:
175          * 1) if caller's gonna lookup in dir then we need to protect
176          *    dir from being splitted only - LCK_CR
177          * 2) if caller's gonna modify dir then we need to protect
178          *    dir from being splitted and to flush cache - LCK_CW
179          * 3) if caller's gonna modify dir and that dir seems ready
180          *    for splitting then we need to protect it from any
181          *    type of access (lookup/modify/split) - LCK_EX -bzzz */
182
183         split = mds_splitting_expected(obd, dentry);
184         
185         /*
186          * it is important to check here only for MDS_NO_SPLITTABLE. The reason
187          * is that MDS_NO_SPLITTABLE means dir is not splittable in principle
188          * and another thread will not split it on the quiet. But if we have
189          * MDS_NO_SPLIT_EXPECTED, this means, that dir may be splitted anytime,
190          * but not now (for current thread) and we should consider that it can
191          * happen soon and go that branch which can yield LCK_EX to protect from
192          * possible splitting.
193          */
194         if (split == MDS_NO_SPLITTABLE) {
195                 /*
196                  * this inode won't be splitted. so we need not to protect from
197                  * just flush client's cache on modification.
198                  */
199                 if (mode == LCK_PW)
200                         ret_mode = LCK_CW;
201                 else
202                         ret_mode = 0;
203         } else {
204                 if (mode == LCK_EX) {
205                         ret_mode = LCK_EX;
206                 } else if (mode == LCK_PR) {
207                         ret_mode = LCK_CR;
208                 } else if (mode == LCK_PW) {
209                         /*
210                          * caller gonna modify directory. We use concurrent
211                          * write lock here to retract client's cache for
212                          * readdir.
213                          */
214                         if (split == MDS_EXPECT_SPLIT) {
215                                 /*
216                                  * splitting possible. serialize any access the
217                                  * idea is that first one seen dir is splittable
218                                  * is given exclusive lock and split
219                                  * directory. caller passes lock mode to
220                                  * mds_try_to_split_dir() and splitting would be
221                                  * done with exclusive lock only -bzzz.
222                                  */
223                                 CDEBUG(D_OTHER, "%s: gonna split %lu/%lu\n",
224                                        obd->obd_name,
225                                        (unsigned long)dentry->d_inode->i_ino,
226                                        (unsigned long)dentry->d_inode->i_generation);
227                                 ret_mode = LCK_EX;
228                         } else {
229                                 ret_mode = LCK_CW;
230                         }
231                 }
232         }
233
234         return ret_mode;        
235 }
236
237 /* only valid locked dentries or errors should be returned */
238 struct dentry *mds_id2locked_dentry(struct obd_device *obd, struct lustre_id *id,
239                                     struct vfsmount **mnt, int lock_mode,
240                                     struct lustre_handle *lockh, int *mode,
241                                     char *name, int namelen, __u64 lockpart)
242 {
243         struct dentry *de = mds_id2dentry(obd, id, mnt), *retval = de;
244         ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
245         struct ldlm_res_id res_id = { .name = {0} };
246         int flags = 0, rc;
247         ENTRY;
248
249         if (IS_ERR(de))
250                 RETURN(de);
251
252         lockh[1].cookie = 0;
253         res_id.name[0] = id_fid(id);
254         res_id.name[1] = id_group(id);
255         
256 #ifdef S_PDIROPS
257         if (name && IS_PDIROPS(de->d_inode)) {
258                 ldlm_policy_data_t cpolicy =
259                         { .l_inodebits = { MDS_INODELOCK_UPDATE } };
260                 LASSERT(mode != NULL);
261                 *mode = mds_lock_mode_for_dir(obd, de, lock_mode);
262                 if (*mode) {
263                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
264                                               res_id, LDLM_IBITS,
265                                               &cpolicy, *mode, &flags,
266                                               mds_blocking_ast,
267                                               ldlm_completion_ast, NULL, NULL,
268                                               NULL, 0, NULL, lockh + 1);
269                         if (rc != ELDLM_OK) {
270                                 l_dput(de);
271                                 RETURN(ERR_PTR(-ENOLCK));
272                         }
273                 }
274                 flags = 0;
275
276                 res_id.name[2] = full_name_hash((unsigned char *)name, namelen);
277
278                 CDEBUG(D_INFO, "take lock on "DLID4":"LPX64"\n",
279                        OLID4(id), res_id.name[2]);
280         }
281 #else
282 #warning "No PDIROPS support in the kernel"
283 #endif
284         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
285                               LDLM_IBITS, &policy, lock_mode, &flags,
286                               mds_blocking_ast, ldlm_completion_ast,
287                               NULL, NULL, NULL, 0, NULL, lockh);
288         if (rc != ELDLM_OK) {
289                 l_dput(de);
290                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
291 #ifdef S_PDIROPS
292                 if (lockh[1].cookie)
293                         ldlm_lock_decref(lockh + 1, *mode);
294 #endif
295         }
296
297         RETURN(retval);
298 }
299
300 #ifndef DCACHE_DISCONNECTED
301 #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED
302 #endif
303
304 /* Look up an entry by inode number. This function ONLY returns valid dget'd
305  * dentries with an initialized inode or errors */
306 struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id,
307                              struct vfsmount **mnt)
308 {
309         unsigned long ino = (unsigned long)id_ino(id);
310         __u32 generation = (__u32)id_gen(id);
311         struct mds_obd *mds = &obd->u.mds;
312         struct dentry *result;
313         struct inode *inode;
314         char idname[32];
315
316         if (ino == 0)
317                 RETURN(ERR_PTR(-ESTALE));
318
319         snprintf(idname, sizeof(idname), "0x%lx", ino);
320
321         CDEBUG(D_DENTRY, "--> mds_id2dentry: ino/gen %lu/%u, sb %p\n",
322                ino, generation, mds->mds_sb);
323
324         /* under ext3 this is neither supposed to return bad inodes nor NULL
325            inodes. */
326         result = ll_lookup_one_len(idname, mds->mds_id_de, 
327                                    strlen(idname));
328         if (IS_ERR(result))
329                 RETURN(result);
330
331         inode = result->d_inode;
332         if (!inode)
333                 RETURN(ERR_PTR(-ENOENT));
334
335         if (is_bad_inode(inode)) {
336                 CERROR("bad inode returned %lu/%u\n",
337                        inode->i_ino, inode->i_generation);
338                 dput(result);
339                 RETURN(ERR_PTR(-ENOENT));
340         }
341
342         /* here we disabled generation check, as root inode i_generation
343          * of cache mds and real mds are different. */
344         if (inode->i_ino != id_ino(&mds->mds_rootid) && generation &&
345             inode->i_generation != generation) {
346                 /* we didn't find the right inode.. */
347                 if (id_group(id) != mds->mds_num) {
348                         CERROR("bad inode %lu found, link: %lu, ct: %d, generation "
349                                "%u != %u, mds %u != %u, request to wrong MDS?\n",
350                                inode->i_ino, (unsigned long)inode->i_nlink,
351                                atomic_read(&inode->i_count), inode->i_generation,
352                                generation, mds->mds_num, (unsigned)id_group(id));
353                 } else {
354                         CERROR("bad inode %lu found, link: %lu, ct: %d, generation "
355                                "%u != %u, inode is recreated while request handled?\n",
356                                inode->i_ino, (unsigned long)inode->i_nlink,
357                                atomic_read(&inode->i_count), inode->i_generation,
358                                generation);
359                 }
360                 dput(result);
361                 RETURN(ERR_PTR(-ENOENT));
362         }
363
364         if (mnt) {
365                 *mnt = mds->mds_vfsmnt;
366                 mntget(*mnt);
367         }
368
369         RETURN(result);
370 }
371
372 static
373 int mds_req_add_idmapping(struct ptlrpc_request *req,
374                           struct mds_export_data *med)
375 {
376         struct mds_req_sec_desc *rsd;
377         struct lustre_sec_desc  *lsd;
378         int rc;
379
380         if (!med->med_remote)
381                 return 0;
382
383         /* maybe we should do it more completely: invalidate the gss ctxt? */
384         if (req->rq_mapped_uid == MDS_IDMAP_NOTFOUND) {
385                 CWARN("didn't find mapped uid\n");
386                 return -EPERM;
387         }
388
389         rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF);
390         if (!rsd) {
391                 CERROR("Can't unpack security desc\n");
392                 return -EPROTO;
393         }
394
395         lsd = mds_get_lsd(req->rq_mapped_uid);
396         if (!lsd) {
397                 CERROR("can't get LSD(%u), no mapping added\n",
398                        req->rq_mapped_uid);
399                 return -EPERM;
400         }
401
402         rc = mds_idmap_add(med->med_idmap, rsd->rsd_uid, lsd->lsd_uid,
403                            rsd->rsd_gid, lsd->lsd_gid);
404         mds_put_lsd(lsd);
405         return rc;
406 }
407
408 static
409 int mds_req_del_idmapping(struct ptlrpc_request *req,
410                           struct mds_export_data *med)
411 {
412         struct mds_req_sec_desc *rsd;
413         struct lustre_sec_desc  *lsd;
414         int rc;
415
416         if (!med->med_remote)
417                 return 0;
418
419         rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF);
420         if (!rsd) {
421                 CERROR("Can't unpack security desc\n");
422                 return -EPROTO;
423         }
424
425         LASSERT(req->rq_mapped_uid != -1);
426         lsd = mds_get_lsd(req->rq_mapped_uid);
427         if (!lsd) {
428                 CERROR("can't get LSD(%u), no idmapping deleted\n",
429                        req->rq_mapped_uid);
430                 return -EPERM;
431         }
432
433         rc = mds_idmap_del(med->med_idmap, rsd->rsd_uid, lsd->lsd_uid,
434                            rsd->rsd_gid, lsd->lsd_gid);
435         mds_put_lsd(lsd);
436         return rc;
437 }
438
439 static int mds_init_export_data(struct ptlrpc_request *req,
440                                 struct mds_export_data *med)
441 {
442         struct obd_connect_data *data, *reply;
443         int ask_remote, ask_local;
444         ENTRY;
445
446         data = lustre_msg_buf(req->rq_reqmsg, 5, sizeof(*data));
447         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*data));
448         LASSERT(data && reply);
449
450         if (med->med_initialized) {
451                 CDEBUG(D_SEC, "med already initialized, reconnect?\n");
452                 goto reply;
453         }
454
455         ask_remote = data->ocd_connect_flags & OBD_CONNECT_REMOTE;
456         ask_local = data->ocd_connect_flags & OBD_CONNECT_LOCAL;
457
458         /* currently the policy is simple: satisfy client as possible
459          * as we can.
460          */
461         if (req->rq_auth_uid == -1) {
462                 if (ask_remote)
463                         CWARN("null sec is used, force to be local\n");
464                 med->med_remote = 0;
465         } else {
466                 if (ask_remote) {
467                         if (!req->rq_remote_realm)
468                                 CWARN("local realm asked to be remote\n");
469                         med->med_remote = 1;
470                 } else if (ask_local) {
471                         if (req->rq_remote_realm)
472                                 CWARN("remote realm asked to be local\n");
473                         med->med_remote = 0;
474                 } else
475                         med->med_remote = (req->rq_remote_realm != 0);
476         }
477
478         med->med_nllu = data->ocd_nllu[0];
479         med->med_nllg = data->ocd_nllu[1];
480
481         med->med_initialized = 1;
482 reply:
483         reply->ocd_connect_flags &= ~(OBD_CONNECT_REMOTE | OBD_CONNECT_LOCAL);
484         if (med->med_remote) {
485                 if (!med->med_idmap)
486                         med->med_idmap = mds_idmap_alloc();
487
488                 if (!med->med_idmap)
489                         CERROR("Failed to alloc idmap, following request from "
490                                "this client will be refused\n");
491
492                 reply->ocd_connect_flags |= OBD_CONNECT_REMOTE;
493                 CDEBUG(D_SEC, "set client as remote\n");
494         } else {
495                 reply->ocd_connect_flags |= OBD_CONNECT_LOCAL;
496                 CDEBUG(D_SEC, "set client as local\n");
497         }
498
499         RETURN(0);
500 }
501
502 static void mds_free_export_data(struct mds_export_data *med)
503 {
504         if (!med->med_idmap)
505                 return;
506
507         LASSERT(med->med_remote);
508         mds_idmap_free(med->med_idmap);
509         med->med_idmap = NULL;
510 }
511
512 /* Establish a connection to the MDS.
513  *
514  * This will set up an export structure for the client to hold state data about
515  * that client, like open files, the last operation number it did on the server,
516  * etc.
517  */
518 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
519                        struct obd_uuid *cluuid, struct obd_connect_data *data,
520                        unsigned long flags)
521 {
522         struct mds_export_data *med;
523         struct mds_client_data *mcd;
524         struct obd_export *exp;
525         int rc;
526         ENTRY;
527
528         if (!conn || !obd || !cluuid)
529                 RETURN(-EINVAL);
530
531         /* XXX There is a small race between checking the list and adding a new
532          * connection for the same UUID, but the real threat (list corruption
533          * when multiple different clients connect) is solved.
534          *
535          * There is a second race between adding the export to the list, and
536          * filling in the client data below.  Hence skipping the case of NULL
537          * mcd above.  We should already be controlling multiple connects at the
538          * client, and we can't hold the spinlock over memory allocations
539          * without risk of deadlocking.
540          */
541         rc = class_connect(conn, obd, cluuid);
542         if (rc)
543                 RETURN(rc);
544         exp = class_conn2export(conn);
545         
546         LASSERT(exp != NULL);
547         med = &exp->exp_mds_data;
548
549         OBD_ALLOC(mcd, sizeof(*mcd));
550         if (!mcd) {
551                 CERROR("%s: out of memory for client data.\n",
552                         obd->obd_name);
553                 GOTO(out, rc = -ENOMEM);
554         }
555
556         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
557         med->med_mcd = mcd;
558
559         rc = mds_client_add(obd, &obd->u.mds, med, -1);
560         if (rc)
561                 GOTO(out, rc);
562        
563         EXIT;
564 out:
565         if (rc) {
566                 if (mcd)
567                         OBD_FREE(mcd, sizeof(*mcd));
568                 class_disconnect(exp, 0);
569         } else {
570                 class_export_put(exp);
571         }
572         return rc;
573 }
574
575 static int mds_connect_post(struct obd_export *exp, unsigned initial,
576                             unsigned long flags)
577 {
578         struct obd_device *obd = exp->exp_obd;
579         struct mds_obd *mds = &obd->u.mds;
580         struct mds_export_data *med;
581         struct mds_client_data *mcd;
582         int rc = 0;
583         ENTRY;
584
585         med = &exp->exp_mds_data;
586         mcd = med->med_mcd;
587
588         if (initial) {
589                 /* some one reconnect initially, we have to reset
590                  * data existing export can have. bug 6102 */
591                 if (mcd->mcd_last_xid != 0)
592                         CDEBUG(D_HA, "initial reconnect to existing export\n");
593                 mcd->mcd_last_transno = 0;
594                 mcd->mcd_last_xid = 0;
595                 mcd->mcd_last_result = 0;
596                 mcd->mcd_last_data = 0;
597         }
598
599         if (!(flags & OBD_OPT_MDS_CONNECTION)) {
600                 if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)) {
601                         atomic_inc(&mds->mds_real_clients);
602                         CDEBUG(D_OTHER,"%s: peer from %s is real client (%d)\n",
603                                obd->obd_name, exp->exp_client_uuid.uuid,
604                                atomic_read(&mds->mds_real_clients));
605                         exp->exp_flags |= OBD_OPT_REAL_CLIENT;
606                 }
607                 if (mds->mds_md_name)
608                         rc = mds_md_connect(obd, mds->mds_md_name);
609         }
610         RETURN(rc);
611 }
612
613 static int mds_init_export(struct obd_export *exp)
614 {
615         struct mds_export_data *med = &exp->exp_mds_data;
616
617         INIT_LIST_HEAD(&med->med_open_head);
618         spin_lock_init(&med->med_open_lock);
619         return 0;
620 }
621
622 static int mds_destroy_export(struct obd_export *export)
623 {
624         struct obd_device *obd = export->exp_obd;
625         struct mds_export_data *med = &export->exp_mds_data;
626         struct lvfs_run_ctxt saved;
627         int rc = 0;
628         ENTRY;
629
630         mds_free_export_data(med);
631         target_destroy_export(export);
632
633         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
634                 GOTO(out, 0);
635
636         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
637
638         /* Close any open files (which may also cause orphan unlinking). */
639         spin_lock(&med->med_open_lock);
640         while (!list_empty(&med->med_open_head)) {
641                 struct list_head *tmp = med->med_open_head.next;
642                 struct mds_file_data *mfd =
643                         list_entry(tmp, struct mds_file_data, mfd_list);
644                 struct lustre_id sid;
645                 
646                 BDEVNAME_DECLARE_STORAGE(btmp);
647
648                 /* bug 1579: fix force-closing for 2.5 */
649                 struct dentry *dentry = mfd->mfd_dentry;
650
651                 list_del(&mfd->mfd_list);
652                 spin_unlock(&med->med_open_lock);
653
654                 down(&dentry->d_inode->i_sem);
655                 rc = mds_read_inode_sid(obd, dentry->d_inode, &sid);
656                 up(&dentry->d_inode->i_sem);
657                 if (rc) {
658                         CERROR("Can't read inode self id, inode %lu, "
659                                "rc %d\n", dentry->d_inode->i_ino, rc);
660                         memset(&sid, 0, sizeof(sid));
661                 }
662
663                 /* If you change this message, be sure to update
664                  * replay_single:test_46 */
665                 CERROR("force closing client file handle for %.*s (%s:"
666                        DLID4")\n", dentry->d_name.len, dentry->d_name.name,
667                        ll_bdevname(dentry->d_inode->i_sb, btmp),
668                        OLID4(&sid));
669                 
670                 /* child inode->i_alloc_sem protects orphan_dec_test and
671                  * is_orphan race, mds_mfd_close drops it */
672                 DOWN_WRITE_I_ALLOC_SEM(dentry->d_inode);
673                 rc = mds_mfd_close(NULL, 0, obd, mfd,
674                                    !(export->exp_flags & OBD_OPT_FAILOVER));
675                 if (rc)
676                         CDEBUG(D_INODE, "Error closing file: %d\n", rc);
677                 spin_lock(&med->med_open_lock);
678         }
679         spin_unlock(&med->med_open_lock);
680         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
681
682         EXIT;
683 out:
684         mds_client_free(export, !(export->exp_flags & OBD_OPT_FAILOVER));
685         return rc;
686 }
687
688 static int mds_disconnect(struct obd_export *exp, unsigned long flags)
689 {
690         unsigned long irqflags;
691         struct obd_device *obd;
692         struct mds_obd *mds;
693         int rc;
694         ENTRY;
695
696         LASSERT(exp != NULL);
697         obd = class_exp2obd(exp);
698         if (obd == NULL) {
699                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
700                        exp->exp_handle.h_cookie);
701                 RETURN(-EINVAL);
702         }
703         mds = &obd->u.mds;
704
705         /*
706          * suppress any inter-mds requests durring disconnecting lmv if this is
707          * detected --force mode. This is needed to avoid endless recovery.
708          */
709         if (atomic_read(&mds->mds_real_clients) > 0 &&
710             !(exp->exp_flags & OBD_OPT_REAL_CLIENT))
711                 flags |= OBD_OPT_FORCE;
712                                                                                               
713         if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)
714             && !atomic_read(&mds->mds_real_clients)) {
715                 /* there was no client at all */
716                 mds_md_disconnect(obd, flags);
717         }
718
719         if ((exp->exp_flags & OBD_OPT_REAL_CLIENT)
720             && atomic_dec_and_test(&mds->mds_real_clients)) {
721                 /* time to drop LMV connections */
722                 CDEBUG(D_OTHER, "%s: last real client %s disconnected.  "
723                        "Disconnnect from LMV now\n",
724                        obd->obd_name, exp->exp_client_uuid.uuid);
725                 mds_md_disconnect(obd, flags);
726         }
727
728         spin_lock_irqsave(&exp->exp_lock, irqflags);
729         exp->exp_flags = flags;
730         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
731
732         /* disconnect early so that clients can't keep using export */
733         rc = class_disconnect(exp, flags);
734         ldlm_cancel_locks_for_export(exp);
735
736         /* complete all outstanding replies */
737         spin_lock_irqsave(&exp->exp_lock, irqflags);
738         while (!list_empty(&exp->exp_outstanding_replies)) {
739                 struct ptlrpc_reply_state *rs =
740                         list_entry(exp->exp_outstanding_replies.next,
741                                    struct ptlrpc_reply_state, rs_exp_list);
742                 struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
743
744                 spin_lock(&svc->srv_lock);
745                 list_del_init(&rs->rs_exp_list);
746                 ptlrpc_schedule_difficult_reply(rs);
747                 spin_unlock(&svc->srv_lock);
748         }
749         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
750         RETURN(rc);
751 }
752
753 static int mds_getstatus(struct ptlrpc_request *req)
754 {
755         struct mds_obd *mds = mds_req2mds(req);
756         struct mds_body *body;
757         int rc, size;
758         ENTRY;
759
760         size = sizeof(*body);
761         
762         rc = lustre_pack_reply(req, 1, &size, NULL);
763         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
764                 CERROR("mds: out of memory for message: size=%d\n", size);
765                 req->rq_status = -ENOMEM;       /* superfluous? */
766                 RETURN(-ENOMEM);
767         }
768
769         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
770         body->valid |= OBD_MD_FID;
771         
772         memcpy(&body->id1, &mds->mds_rootid, sizeof(body->id1));
773
774         /*
775          * the last_committed and last_xid fields are filled in for all replies
776          * already - no need to do so here also.
777          */
778         RETURN(0);
779 }
780
781 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
782                      void *data, int flag)
783 {
784         int do_ast;
785         ENTRY;
786
787         if (flag == LDLM_CB_CANCELING) {
788                 /* Don't need to do anything here. */
789                 RETURN(0);
790         }
791
792         /* XXX layering violation!  -phil */
793         l_lock(&lock->l_resource->lr_namespace->ns_lock);
794         
795         /*
796          * get this: if mds_blocking_ast is racing with mds_intent_policy, such
797          * that mds_blocking_ast is called just before l_i_p takes the ns_lock,
798          * then by the time we get the lock, we might not be the correct
799          * blocking function anymore.  So check, and return early, if so.
800          */
801         if (lock->l_blocking_ast != mds_blocking_ast) {
802                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
803                 RETURN(0);
804         }
805
806         lock->l_flags |= LDLM_FL_CBPENDING;
807         do_ast = (!lock->l_readers && !lock->l_writers);
808         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
809
810         if (do_ast) {
811                 struct lustre_handle lockh;
812                 int rc;
813
814                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
815                 ldlm_lock2handle(lock, &lockh);
816                 rc = ldlm_cli_cancel(&lockh);
817                 if (rc < 0)
818                         CERROR("ldlm_cli_cancel: %d\n", rc);
819         } else {
820                 LDLM_DEBUG(lock, "Lock still has references, will be "
821                            "cancelled later");
822         }
823         RETURN(0);
824 }
825
826 static int mds_convert_md(struct obd_device *obd, struct inode *inode,
827                           void *md, int size, int mea)
828 {
829         int rc = size;
830         
831         if (S_ISREG(inode->i_mode)) {
832                 rc = mds_convert_lov_ea(obd, inode, md, size);
833         } else if (S_ISDIR(inode->i_mode)) {
834                 if (mea) {
835                         rc = mds_convert_mea_ea(obd, inode, md, size);
836                 } else {
837                         rc = mds_convert_lov_ea(obd, inode, md, size);
838                 }
839                 if (rc == -EINVAL) {
840                         CERROR("Invalid EA format (nor LOV or MEA) "
841                                "is detected. Inode %lu/%u\n",
842                                inode->i_ino, inode->i_generation);
843                 }
844         }
845         return rc;
846 }
847
848 int mds_get_md(struct obd_device *obd, struct inode *inode,
849                void *md, int *size, int lock, int mea)
850 {
851         int lmm_size;
852         int rc = 0;
853         ENTRY;
854
855         if (lock)
856                 down(&inode->i_sem);
857
858         rc = fsfilt_get_md(obd, inode, md, *size,
859                            (mea ? EA_MEA : EA_LOV));
860         if (rc < 0) {
861                 CERROR("Error %d reading eadata for ino %lu\n",
862                        rc, inode->i_ino);
863         } else if (rc > 0) {
864                 lmm_size = rc;
865                 rc = mds_convert_md(obd, inode, md,
866                                     lmm_size, mea);
867                 if (rc == 0) {
868                         *size = lmm_size;
869                         rc = lmm_size;
870                 } else if (rc > 0) {
871                         *size = rc;
872                 }
873         }
874         if (lock)
875                 up(&inode->i_sem);
876
877         RETURN(rc);
878 }
879
880 /* Call with lock=1 if you want mds_pack_md to take the i_sem.
881  * Call with lock=0 if the caller has already taken the i_sem. */
882 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
883                 struct mds_body *body, struct inode *inode, int lock, int mea)
884 {
885         struct mds_obd *mds = &obd->u.mds;
886         int rc, lmm_size;
887         void *lmm;
888         ENTRY;
889
890         lmm = lustre_msg_buf(msg, offset, 0);
891         if (lmm == NULL) {
892                 /* Some problem with getting eadata when I sized the reply
893                  * buffer... */
894                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
895                        inode->i_ino);
896                 RETURN(0);
897         }
898         lmm_size = msg->buflens[offset];
899
900         /* I don't really like this, but it is a sanity check on the client
901          * MD request.  However, if the client doesn't know how much space
902          * to reserve for the MD, it shouldn't be bad to have too much space.
903          */
904         if (lmm_size > mds->mds_max_mdsize) {
905                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
906                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
907                 // RETURN(-EINVAL);
908         }
909
910         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock, mea);
911         if (rc > 0) {
912                 body->valid |= S_ISDIR(inode->i_mode) ?
913                         OBD_MD_FLDIREA : OBD_MD_FLEASIZE;
914                 
915                 if (mea)
916                         body->valid |= OBD_MD_MEA;
917                 
918                 body->eadatasize = lmm_size;
919                 rc = 0;
920         }
921
922         RETURN(rc);
923 }
924
925 int mds_pack_link(struct dentry *dentry, struct ptlrpc_request *req,
926                   struct mds_body *repbody, int reply_off)
927 {
928         struct inode *inode = dentry->d_inode;
929         char *symname;
930         int len, rc;
931         ENTRY;
932
933         symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1,0);
934         LASSERT(symname != NULL);
935         len = req->rq_repmsg->buflens[reply_off + 1];
936         
937         rc = inode->i_op->readlink(dentry, symname, len);
938         if (rc < 0) {
939                 CERROR("readlink failed: %d\n", rc);
940         } else if (rc != len - 1) {
941                 CERROR ("Unexpected readlink rc %d: expecting %d\n",
942                         rc, len - 1);
943                 rc = -EINVAL;
944         } else {
945                 CDEBUG(D_INODE, "read symlink dest %s\n", symname);
946                 repbody->valid |= OBD_MD_LINKNAME;
947                 repbody->eadatasize = rc + 1;
948                 symname[rc] = 0;        /* NULL terminate */
949                 rc = 0;
950         }
951
952         RETURN(rc);
953 }
954
955 int mds_pack_ea(struct dentry *dentry, struct ptlrpc_request *req,
956                 struct mds_body *repbody, int req_off, int reply_off)
957 {
958         struct inode *inode = dentry->d_inode;
959         char *ea_name;
960         void *value = NULL;
961         int len, rc;
962         ENTRY;
963
964         ea_name = lustre_msg_string(req->rq_reqmsg, req_off + 1, 0);
965         len = req->rq_repmsg->buflens[reply_off + 1];
966         if (len != 0)
967                 value = lustre_msg_buf(req->rq_repmsg, reply_off + 1, len);
968
969         rc = -EOPNOTSUPP;
970         if (inode->i_op && inode->i_op->getxattr) 
971                 rc = inode->i_op->getxattr(dentry, ea_name, value, len);
972
973         if (rc < 0) {
974                 if (rc != -ENODATA && rc != -EOPNOTSUPP)
975                         CERROR("getxattr failed: %d", rc);
976         } else {
977                 repbody->valid |= OBD_MD_FLEA;
978                 repbody->eadatasize = rc;
979                 rc = 0;
980         }
981
982         RETURN(rc);        
983 }
984
985 int mds_pack_ealist(struct dentry *dentry, struct ptlrpc_request *req,
986                     struct mds_body *repbody, int reply_off)
987 {
988         struct inode *inode = dentry->d_inode;        
989         void *value = NULL;
990         int len, rc;
991         ENTRY;
992
993         len = req->rq_repmsg->buflens[reply_off + 1];
994         if (len != 0)
995                 value = lustre_msg_buf(req->rq_repmsg, reply_off + 1, len);
996
997         rc = -EOPNOTSUPP;
998         if (inode->i_op && inode->i_op->getxattr) 
999                 rc = inode->i_op->listxattr(dentry, value, len);
1000
1001         if (rc < 0) {
1002                 CERROR("listxattr failed: %d", rc);
1003         } else {
1004                 repbody->valid |= OBD_MD_FLEALIST;
1005                 repbody->eadatasize = rc;
1006                 rc = 0;
1007         }
1008         RETURN(rc);
1009 }
1010
1011 int mds_pack_acl(struct obd_device *obd, struct lustre_msg *repmsg, int offset,
1012                  struct mds_body *body, struct inode *inode)
1013 {
1014         struct dentry de = { .d_inode = inode };
1015         __u32 buflen, *sizep;
1016         void *buf;
1017         int size;
1018         ENTRY;
1019
1020         if (!inode->i_op->getxattr)
1021                 RETURN(0);
1022
1023         buflen = repmsg->buflens[offset + 1];
1024         buf = lustre_msg_buf(repmsg, offset + 1, buflen);
1025
1026         size = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS, buf, buflen);
1027         if (size == -ENODATA || size == -EOPNOTSUPP)
1028                 RETURN(0);
1029         if (size < 0)
1030                 RETURN(size);
1031         LASSERT(size);
1032
1033         sizep = lustre_msg_buf(repmsg, offset, 4);
1034         if (!sizep) {
1035                 CERROR("can't locate returned acl size buf\n");
1036                 RETURN(-EPROTO);
1037         }
1038
1039         *sizep = cpu_to_le32(size);
1040         body->valid |= OBD_MD_FLACL_ACCESS;
1041
1042         RETURN(0);
1043 }
1044
1045 /* 
1046  * here we take simple rule: once uid/fsuid is root, we also squash
1047  * the gid/fsgid, don't care setuid/setgid attributes.
1048  */
1049 int mds_squash_root(struct mds_obd *mds, struct mds_req_sec_desc *rsd,
1050                     ptl_nid_t *peernid)
1051 {
1052         if (!mds->mds_squash_uid || *peernid == mds->mds_nosquash_nid)
1053                 return 0;
1054
1055         if (rsd->rsd_uid && rsd->rsd_fsuid)
1056                 return 0;
1057
1058         CDEBUG(D_SEC, "squash req from "LPX64":"
1059                "(%u:%u-%u:%u/%x)=>(%u:%u-%u:%u/%x)\n", *peernid,
1060                 rsd->rsd_uid, rsd->rsd_gid,
1061                 rsd->rsd_fsuid, rsd->rsd_fsgid, rsd->rsd_cap,
1062                 rsd->rsd_uid ? rsd->rsd_uid : mds->mds_squash_uid,
1063                 rsd->rsd_uid ? rsd->rsd_gid : mds->mds_squash_gid,
1064                 rsd->rsd_fsuid ? rsd->rsd_fsuid : mds->mds_squash_uid,
1065                 rsd->rsd_fsuid ? rsd->rsd_fsgid : mds->mds_squash_gid,
1066                 rsd->rsd_cap & ~CAP_FS_MASK);
1067
1068         if (rsd->rsd_uid == 0) {
1069                 rsd->rsd_uid = mds->mds_squash_uid;
1070                 rsd->rsd_gid = mds->mds_squash_gid;
1071         }
1072         if (rsd->rsd_fsuid == 0) {
1073                 rsd->rsd_fsuid = mds->mds_squash_uid;
1074                 rsd->rsd_fsgid = mds->mds_squash_gid;
1075         }
1076         rsd->rsd_cap &= ~CAP_FS_MASK;
1077
1078         return 1;
1079 }
1080
1081 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
1082                                 struct ptlrpc_request *req, int req_off,
1083                                 struct mds_body *reqbody, int reply_off)
1084 {
1085         struct mds_export_data *med = &req->rq_export->u.eu_mds_data;
1086         struct inode *inode = dentry->d_inode;
1087         struct mds_body *body;
1088         int rc = 0;
1089         ENTRY;
1090
1091         if (inode == NULL && !(dentry->d_flags & DCACHE_CROSS_REF))
1092                 RETURN(-ENOENT);
1093
1094         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
1095         LASSERT(body != NULL);                 /* caller prepped reply */
1096
1097         if (dentry->d_flags & DCACHE_CROSS_REF) {
1098                 mds_pack_dentry2body(obd, body, dentry,
1099                                      (reqbody->valid & OBD_MD_FID) ? 1 : 0);
1100                 CDEBUG(D_OTHER, "cross reference: "DLID4"\n",
1101                        OLID4(&body->id1));
1102                 RETURN(0);
1103         }
1104         
1105         mds_pack_inode2body(obd, body, inode,
1106                             (reqbody->valid & OBD_MD_FID) ? 1 : 0);
1107
1108         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
1109             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
1110             
1111                 /* guessing what kind og attribute do we need. */
1112                 int is_mea = (S_ISDIR(inode->i_mode) && 
1113                     (reqbody->valid & OBD_MD_MEA) != 0);
1114                 
1115                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1, 
1116                                  body, inode, 1, is_mea);
1117
1118                 /* if we have LOV EA data, the OST holds size, atime, mtime. */
1119                 if (!(body->valid & OBD_MD_FLEASIZE) &&
1120                     !(body->valid & OBD_MD_FLDIREA))
1121                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
1122                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
1123         } else if (S_ISLNK(inode->i_mode) &&
1124                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
1125                 rc = mds_pack_link(dentry, req, body, reply_off);
1126         } else if (reqbody->valid & OBD_MD_FLEA) {
1127                 rc = mds_pack_ea(dentry, req, body, req_off, reply_off);
1128         } else if (reqbody->valid & OBD_MD_FLEALIST) {
1129                 rc = mds_pack_ealist(dentry, req, body, reply_off);
1130         }
1131         
1132         if (reqbody->valid & OBD_MD_FLACL_ACCESS) {
1133                 int inc = (reqbody->valid & OBD_MD_FLEASIZE) ? 2 : 1;
1134                 rc = mds_pack_acl(obd, req->rq_repmsg, reply_off + inc, 
1135                                   body, inode);
1136         }                
1137
1138         if (rc == 0)
1139                 mds_body_do_reverse_map(med, body);
1140
1141         RETURN(rc);
1142 }
1143
1144 static int mds_getattr_pack_msg_cf(struct ptlrpc_request *req,
1145                                    struct dentry *dentry,
1146                                    int offset)
1147 {
1148         int rc = 0, size[1] = {sizeof(struct mds_body)};
1149         ENTRY;
1150
1151         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
1152                 CERROR("failed MDS_GETATTR_PACK test\n");
1153                 req->rq_status = -ENOMEM;
1154                 RETURN(-ENOMEM);
1155         }
1156
1157         rc = lustre_pack_reply(req, 1, size, NULL);
1158         if (rc) {
1159                 CERROR("lustre_pack_reply failed: rc %d\n", rc);
1160                 GOTO(out, req->rq_status = rc);
1161         }
1162
1163         EXIT;
1164 out:
1165         return rc;
1166 }
1167
1168 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct dentry *de,
1169                                 int offset)
1170 {
1171         struct inode *inode = de->d_inode;
1172         struct mds_obd *mds = mds_req2mds(req);
1173         struct mds_body *body;
1174         int rc = 0, size[4] = {sizeof(*body)}, bufcount = 1;
1175         ENTRY;
1176
1177         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
1178         LASSERT(body != NULL);                 /* checked by caller */
1179         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
1180
1181         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
1182             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
1183                 int rc;
1184                 
1185                 down(&inode->i_sem);
1186                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
1187                                    ((body->valid & OBD_MD_MEA) ? EA_MEA : EA_LOV));
1188                 up(&inode->i_sem);
1189                 if (rc < 0) {
1190                         if (rc != -ENODATA && rc != -EOPNOTSUPP)
1191                                 CERROR("error getting inode %lu MD: rc = %d\n",
1192                                        inode->i_ino, rc);
1193                         size[bufcount] = 0;
1194                 } else if (rc > mds->mds_max_mdsize) {
1195                         size[bufcount] = 0;
1196                         CERROR("MD size %d larger than maximum possible %u\n",
1197                                rc, mds->mds_max_mdsize);
1198                 } else {
1199                         size[bufcount] = rc;
1200                 }
1201                 bufcount++;
1202         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
1203                 if (inode->i_size + 1 != body->eadatasize)
1204                         CERROR("symlink size: %Lu, reply space: %d\n",
1205                                inode->i_size + 1, body->eadatasize);
1206                 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
1207                 bufcount++;
1208                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
1209                        inode->i_size + 1, body->eadatasize);
1210         } else if ((body->valid & OBD_MD_FLEA)) {
1211                 char *ea_name = lustre_msg_string(req->rq_reqmsg, 
1212                                                   offset + 1, 0);
1213                 rc = -EOPNOTSUPP;
1214                 if (inode->i_op && inode->i_op->getxattr) 
1215                         rc = inode->i_op->getxattr(de, ea_name, NULL, 0);
1216                 
1217                 if (rc < 0) {
1218                         if (rc != -ENODATA && rc != -EOPNOTSUPP)
1219                                 CERROR("error getting inode %lu EA: rc = %d\n",
1220                                        inode->i_ino, rc);
1221                         size[bufcount] = 0;
1222                 } else {
1223                         size[bufcount] = min_t(int, body->eadatasize, rc);
1224                 }
1225                 bufcount++;
1226         } else if (body->valid & OBD_MD_FLEALIST) {
1227                 rc = -EOPNOTSUPP;
1228                 if (inode->i_op && inode->i_op->getxattr) 
1229                         rc = inode->i_op->listxattr(de, NULL, 0);
1230
1231                 if (rc < 0) {
1232                         if (rc != -ENODATA && rc != -EOPNOTSUPP)
1233                                 CERROR("error getting inode %lu EA: rc = %d\n",
1234                                        inode->i_ino, rc);
1235                         size[bufcount] = 0;
1236                 } else {
1237                         size[bufcount] = min_t(int, body->eadatasize, rc);
1238                 }
1239                 bufcount++;
1240         }
1241         
1242         /* may co-exist with OBD_MD_FLEASIZE */
1243         if (body->valid & OBD_MD_FLACL_ACCESS) {
1244                 size[bufcount++] = 4;
1245                 size[bufcount++] = xattr_acl_size(LL_ACL_MAX_ENTRIES);
1246         }
1247
1248         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
1249                 CERROR("failed MDS_GETATTR_PACK test\n");
1250                 req->rq_status = -ENOMEM;
1251                 GOTO(out, rc = -ENOMEM);
1252         }
1253
1254         rc = lustre_pack_reply(req, bufcount, size, NULL);
1255         if (rc) {
1256                 CERROR("out of memory\n");
1257                 GOTO(out, req->rq_status = rc);
1258         }
1259
1260         EXIT;
1261  out:
1262         return rc;
1263 }
1264
1265 int mds_check_mds_num(struct obd_device *obd, struct inode *inode,
1266                       char *name, int namelen)
1267 {
1268         struct mea *mea = NULL;
1269         int mea_size, rc = 0;
1270         ENTRY;
1271         
1272         rc = mds_md_get_attr(obd, inode, &mea, &mea_size);
1273         if (rc)
1274                 RETURN(rc);
1275         if (mea != NULL) {
1276                 /*
1277                  * dir is already splitted, check if requested filename should
1278                  * live at this MDS or at another one.
1279                  */
1280                 int i = mea_name2idx(mea, name, namelen - 1);
1281                 if (mea->mea_master != id_group(&mea->mea_ids[i])) {
1282                         CDEBUG(D_OTHER,
1283                                "inapropriate MDS(%d) for %s. should be "
1284                                "%lu(%d)\n", mea->mea_master, name, 
1285                                (unsigned long)id_group(&mea->mea_ids[i]), i);
1286                         rc = -ERESTART;
1287                 }
1288         }
1289
1290         if (mea)
1291                 OBD_FREE(mea, mea_size);
1292         RETURN(rc);
1293 }
1294
1295 static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
1296                             struct lustre_handle *child_lockh, int child_part)
1297 {
1298         struct obd_device *obd = req->rq_export->exp_obd;
1299         struct mds_obd *mds = &obd->u.mds;
1300         struct ldlm_reply *rep = NULL;
1301         struct lvfs_run_ctxt saved;
1302         struct mds_req_sec_desc *rsd;
1303         struct mds_body *body;
1304         struct dentry *dparent = NULL, *dchild = NULL;
1305         struct lvfs_ucred uc = {NULL, NULL,};
1306         struct lustre_handle parent_lockh[2] = {{0}, {0}};
1307         unsigned int namesize;
1308         int rc = 0, cleanup_phase = 0, resent_req = 0, update_mode, reply_offset;
1309         char *name = NULL;
1310         ENTRY;
1311
1312         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
1313         MD_COUNTER_INCREMENT(obd, getattr_lock);
1314
1315         rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF);
1316         if (!rsd) {
1317                 CERROR("Can't unpack security desc\n");
1318                 RETURN(-EFAULT);
1319         }
1320
1321         /* swab now, before anyone looks inside the request. */
1322         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1323                                   lustre_swab_mds_body);
1324         if (body == NULL) {
1325                 CERROR("Can't swab mds_body\n");
1326                 GOTO(cleanup, rc = -EFAULT);
1327         }
1328
1329         LASSERT_REQSWAB(req, offset + 1);
1330         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
1331         if (name == NULL) {
1332                 CERROR("Can't unpack name\n");
1333                 GOTO(cleanup, rc = -EFAULT);
1334         }
1335         namesize = req->rq_reqmsg->buflens[offset + 1];
1336
1337         /* namesize less than 2 means we have empty name, probably came from
1338            revalidate by cfid, so no point in having name to be set */
1339         if (namesize <= 1)
1340                 name = NULL;
1341
1342         LASSERT (offset == 1 || offset == 3);
1343         if (offset == 3) {
1344                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
1345                 reply_offset = 1;
1346         } else {
1347                 reply_offset = 0;
1348         }
1349
1350         rc = mds_init_ucred(&uc, req, rsd);
1351         if (rc) {
1352                 GOTO(cleanup, rc);
1353         }
1354
1355         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1356         cleanup_phase = 1; /* kernel context */
1357         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
1358
1359         LASSERT(namesize > 0);
1360         if (child_lockh->cookie != 0) {
1361                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
1362                 resent_req = 1;
1363         }
1364 #if HAVE_LOOKUP_RAW
1365         if (body->valid == OBD_MD_FLID) {
1366                 struct mds_body *mds_reply;
1367                 int size = sizeof(*mds_reply);
1368                 struct inode *dir;
1369                 ino_t inum;
1370
1371                 dparent = mds_id2dentry(obd, &body->id1, NULL);
1372                 if (IS_ERR(dparent)) {
1373                         rc = PTR_ERR(dparent);
1374                         GOTO(cleanup, rc);
1375                 }
1376                 /*
1377                  * the user requested ONLY the inode number, so do a raw lookup.
1378                  */
1379                 rc = lustre_pack_reply(req, 1, &size, NULL);
1380                 if (rc) {
1381                         CERROR("out of memory\n");
1382                         l_dput(dparent);
1383                         GOTO(cleanup, rc);
1384                 }
1385                 dir  = dparent->d_inode;
1386                 LASSERT(dir->i_op->lookup_raw != NULL);
1387                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
1388                 l_dput(dparent);
1389                 mds_reply = lustre_msg_buf(req->rq_repmsg, 0,
1390                                            sizeof(*mds_reply));
1391
1392                 id_ino(&mds_reply->id1) = inum;
1393                 mds_reply->valid = OBD_MD_FLID;
1394                 GOTO(cleanup, rc);
1395         }
1396 #endif
1397         if (resent_req == 0) {
1398                 LASSERT(id_fid(&body->id1) != 0);
1399                 if (name) {
1400                         rc = mds_get_parent_child_locked(obd, mds, &body->id1,
1401                                                          parent_lockh, &dparent,
1402                                                          LCK_PR, 
1403                                                          MDS_INODELOCK_UPDATE,
1404                                                          &update_mode, 
1405                                                          name, namesize,
1406                                                          child_lockh, &dchild, 
1407                                                          LCK_PR, child_part);
1408                         if (rc)
1409                                 GOTO(cleanup, rc);
1410                 
1411                         cleanup_phase = 2; /* dchild, dparent, locks */
1412                         
1413                         /*
1414                          * let's make sure this name should leave on this mds
1415                          * node.
1416                          */
1417                         rc = mds_check_mds_num(obd, dparent->d_inode, name, namesize);
1418                         if (rc)
1419                                 GOTO(cleanup, rc);
1420                 } else {
1421                         /* we have no dentry here, drop LOOKUP bit */
1422                         /* FIXME: we need MDS_INODELOCK_LOOKUP or not. */
1423                         child_part &= ~MDS_INODELOCK_LOOKUP;
1424                         CDEBUG(D_OTHER, "%s: retrieve attrs for "DLID4"\n",
1425                                obd->obd_name, OLID4(&body->id1));
1426
1427                         dchild = mds_id2locked_dentry(obd, &body->id1, NULL,
1428                                                       LCK_PR, parent_lockh,
1429                                                       &update_mode,
1430                                                       NULL, 0, 
1431                                                       MDS_INODELOCK_UPDATE);
1432                         if (IS_ERR(dchild)) {
1433                                 CERROR("can't find inode with id "DLID4", err = %d\n", 
1434                                        OLID4(&body->id1), (int)PTR_ERR(dchild));
1435                                 GOTO(cleanup, rc = PTR_ERR(dchild));
1436                         }
1437                         memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0]));
1438                 }
1439         } else {
1440                 struct ldlm_lock *granted_lock;
1441
1442                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
1443                 granted_lock = ldlm_handle2lock(child_lockh);
1444
1445                 LASSERTF(granted_lock != NULL, LPU64"/%lu lockh "LPX64"\n",
1446                          id_fid(&body->id1), (unsigned long)id_group(&body->id1),
1447                          child_lockh->cookie);
1448
1449                 if (name) {
1450                         /* usual named request */
1451                         dparent = mds_id2dentry(obd, &body->id1, NULL);
1452                         LASSERT(!IS_ERR(dparent));
1453                         dchild = ll_lookup_one_len(name, dparent, namesize - 1);
1454                         LASSERT(!IS_ERR(dchild));
1455                 } else {
1456                         /* client wants to get attr. by id */
1457                         dchild = mds_id2dentry(obd, &body->id1, NULL);
1458                         LASSERT(!IS_ERR(dchild));
1459                 }
1460                 LDLM_LOCK_PUT(granted_lock);
1461         }
1462
1463         cleanup_phase = 2; /* dchild, dparent, locks */
1464
1465         if (!DENTRY_VALID(dchild)) {
1466                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
1467                 /*
1468                  * in the intent case, the policy clears this error: the
1469                  * disposition is enough.
1470                  */
1471                 rc = -ENOENT;
1472                 GOTO(cleanup, rc);
1473         } else {
1474                 intent_set_disposition(rep, DISP_LOOKUP_POS);
1475         }
1476
1477         if (req->rq_repmsg == NULL) {
1478                 if (dchild->d_flags & DCACHE_CROSS_REF)
1479                         rc = mds_getattr_pack_msg_cf(req, dchild, offset);
1480                 else
1481                         rc = mds_getattr_pack_msg(req, dchild, offset);
1482                 if (rc != 0) {
1483                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
1484                         GOTO (cleanup, rc);
1485                 }
1486         }
1487
1488         rc = mds_getattr_internal(obd, dchild, req, offset, body, reply_offset);        
1489         GOTO(cleanup, rc); /* returns the lock to the client */
1490
1491  cleanup:
1492         switch (cleanup_phase) {
1493         case 2:
1494                 if (resent_req == 0) {
1495                         if (rc && DENTRY_VALID(dchild))
1496                                 ldlm_lock_decref(child_lockh, LCK_PR);
1497                         if (name)
1498                                 ldlm_lock_decref(parent_lockh, LCK_PR);
1499 #ifdef S_PDIROPS
1500                         if (parent_lockh[1].cookie != 0)
1501                                 ldlm_lock_decref(parent_lockh + 1, update_mode);
1502 #endif
1503                         if (dparent)
1504                                 l_dput(dparent);
1505                 }
1506                 l_dput(dchild);
1507         case 1:
1508                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1509         default:
1510                 mds_exit_ucred(&uc);
1511         }
1512         return rc;
1513 }
1514
1515 static int mds_getattr(struct ptlrpc_request *req, int offset)
1516 {
1517         struct obd_device *obd = req->rq_export->exp_obd;
1518         struct lvfs_run_ctxt saved;
1519         struct dentry *de;
1520         struct mds_req_sec_desc *rsd;
1521         struct mds_body *body;
1522         struct lvfs_ucred uc = {NULL, NULL,};
1523         int rc = 0;
1524         ENTRY;
1525
1526         MD_COUNTER_INCREMENT(obd, getattr);
1527
1528         rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF);
1529         if (!rsd) {
1530                 CERROR("Can't unpack security desc\n");
1531                 RETURN(-EFAULT);
1532         }
1533
1534         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1535                                   lustre_swab_mds_body);
1536         if (body == NULL) {
1537                 CERROR ("Can't unpack body\n");
1538                 RETURN (-EFAULT);
1539         }
1540
1541         rc = mds_init_ucred(&uc, req, rsd);
1542         if (rc) {
1543                 mds_exit_ucred(&uc);
1544                 RETURN(rc);
1545         }
1546
1547         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1548         de = mds_id2dentry(obd, &body->id1, NULL);
1549         if (IS_ERR(de)) {
1550                 rc = req->rq_status = PTR_ERR(de);
1551                 GOTO(out_pop, rc);
1552         }
1553
1554         rc = mds_getattr_pack_msg(req, de, offset);
1555         if (rc != 0) {
1556                 CERROR("mds_getattr_pack_msg: %d\n", rc);
1557                 GOTO(out_pop, rc);
1558         }
1559
1560         req->rq_status = mds_getattr_internal(obd, de, req, offset, body, 0);
1561         l_dput(de);
1562
1563         EXIT;
1564 out_pop:
1565         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1566         mds_exit_ucred(&uc);
1567         return rc;
1568 }
1569
1570 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1571                           unsigned long max_age)
1572 {
1573         int rc;
1574         ENTRY;
1575
1576         spin_lock(&obd->obd_osfs_lock);
1577         rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, max_age);
1578         if (rc == 0)
1579                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1580         spin_unlock(&obd->obd_osfs_lock);
1581
1582         RETURN(rc);
1583 }
1584
1585 static int mds_statfs(struct ptlrpc_request *req)
1586 {
1587         struct obd_device *obd = req->rq_export->exp_obd;
1588         int rc, size = sizeof(struct obd_statfs);
1589         ENTRY;
1590
1591         /* This will trigger a watchdog timeout */
1592         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
1593                          (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
1594
1595         rc = lustre_pack_reply(req, 1, &size, NULL);
1596         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1597                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
1598                 GOTO(out, rc);
1599         }
1600
1601         OBD_COUNTER_INCREMENT(obd, statfs);
1602
1603         /* We call this so that we can cache a bit - 1 jiffie worth */
1604         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
1605                             jiffies - HZ);
1606         if (rc) {
1607                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1608                 GOTO(out, rc);
1609         }
1610
1611         EXIT;
1612 out:
1613         req->rq_status = rc;
1614         return rc;
1615 }
1616
1617 static int mds_sync(struct ptlrpc_request *req, int offset)
1618 {
1619         struct obd_device *obd = req->rq_export->exp_obd;
1620         struct mds_obd *mds = &obd->u.mds;
1621         struct mds_body *body;
1622         int rc, size = sizeof(*body);
1623         ENTRY;
1624
1625         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1626                                   lustre_swab_mds_body);
1627         if (body == NULL)
1628                 GOTO(out, rc = -EPROTO);
1629
1630         rc = lustre_pack_reply(req, 1, &size, NULL);
1631         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1632                 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1633                 GOTO(out, rc);
1634         }
1635
1636         if (id_ino(&body->id1) == 0) {
1637                 /* an id of zero is taken to mean "sync whole filesystem" */
1638                 rc = fsfilt_sync(obd, mds->mds_sb);
1639                 if (rc)
1640                         GOTO(out, rc);
1641         } else {
1642                 /* just any file to grab fsync method - "file" arg unused */
1643                 struct file *file = mds->mds_rcvd_filp;
1644                 struct mds_body *rep_body;
1645                 struct dentry *de;
1646
1647                 de = mds_id2dentry(obd, &body->id1, NULL);
1648                 if (IS_ERR(de))
1649                         GOTO(out, rc = PTR_ERR(de));
1650
1651                 rc = file->f_op->fsync(NULL, de, 1);
1652                 if (rc)
1653                         GOTO(out, rc);
1654
1655                 rep_body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep_body));
1656                 mds_pack_inode2body(obd, rep_body, de->d_inode,
1657                                     (body->valid & OBD_MD_FID) ? 1 : 0);
1658                 l_dput(de);
1659         }
1660
1661         EXIT;
1662 out:
1663         req->rq_status = rc;
1664         return rc;
1665 }
1666
1667 /* mds_readpage does not take a DLM lock on the inode, because the client must
1668  * already have a PR lock.
1669  *
1670  * If we were to take another one here, a deadlock will result, if another
1671  * thread is already waiting for a PW lock. */
1672 static int mds_readpage(struct ptlrpc_request *req, int offset)
1673 {
1674         struct obd_device *obd = req->rq_export->exp_obd;
1675         struct vfsmount *mnt;
1676         struct dentry *de;
1677         struct file *file;
1678         struct mds_req_sec_desc *rsd;
1679         struct mds_body *body, *repbody;
1680         struct lvfs_run_ctxt saved;
1681         int rc, size = sizeof(*repbody);
1682         struct lvfs_ucred uc = {NULL, NULL,};
1683         ENTRY;
1684
1685         rc = lustre_pack_reply(req, 1, &size, NULL);
1686         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
1687                 CERROR("mds: out of memory\n");
1688                 GOTO(out, rc = -ENOMEM);
1689         }
1690
1691         rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF);
1692         if (!rsd) {
1693                 CERROR("Can't unpack security desc\n");
1694                 GOTO (out, rc = -EFAULT);
1695         }
1696
1697         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1698                                   lustre_swab_mds_body);
1699         if (body == NULL) {
1700                 CERROR("Can't unpack body\n");
1701                 GOTO (out, rc = -EFAULT);
1702         }
1703
1704         rc = mds_init_ucred(&uc, req, rsd);
1705         if (rc) {
1706                 GOTO(out, rc);
1707         }
1708
1709         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1710         de = mds_id2dentry(obd, &body->id1, &mnt);
1711         if (IS_ERR(de))
1712                 GOTO(out_pop, rc = PTR_ERR(de));
1713
1714         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1715
1716         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1717         /* note: in case of an error, dentry_open puts dentry */
1718         if (IS_ERR(file))
1719                 GOTO(out_pop, rc = PTR_ERR(file));
1720
1721         /* body->size is actually the offset -eeb */
1722         if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
1723                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1724                        body->size, de->d_inode->i_blksize);
1725                 GOTO(out_file, rc = -EFAULT);
1726         }
1727
1728         /* body->nlink is actually the #bytes to read -eeb */
1729         if (body->nlink & (de->d_inode->i_blksize - 1)) {
1730                 CERROR("size %u is not multiple of blocksize %lu\n",
1731                        body->nlink, de->d_inode->i_blksize);
1732                 GOTO(out_file, rc = -EFAULT);
1733         }
1734
1735         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1736         repbody->size = file->f_dentry->d_inode->i_size;
1737         repbody->valid = OBD_MD_FLSIZE;
1738
1739         /* to make this asynchronous make sure that the handling function
1740            doesn't send a reply when this function completes. Instead a
1741            callback function would send the reply */
1742         /* body->size is actually the offset -eeb */
1743         rc = mds_sendpage(req, file, body->size, body->nlink);
1744
1745         EXIT;
1746 out_file:
1747         filp_close(file, 0);
1748 out_pop:
1749         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1750 out:
1751         mds_exit_ucred(&uc);
1752         req->rq_status = rc;
1753         return 0;
1754 }
1755
1756 /* update master MDS ID, which is stored in local inode EA. */
1757 int mds_update_mid(struct obd_device *obd, struct lustre_id *id,
1758                    void *data, int data_len)
1759 {
1760         struct mds_obd *mds = &obd->u.mds;
1761         struct dentry *dentry;
1762         void *handle;
1763         int rc = 0;
1764         ENTRY;
1765
1766         LASSERT(id);
1767         LASSERT(obd);
1768         
1769         dentry = mds_id2dentry(obd, id, NULL);
1770         if (IS_ERR(dentry))
1771                 GOTO(out, rc = PTR_ERR(dentry));
1772
1773         if (!dentry->d_inode) {
1774                 CERROR("Can't find object "DLID4".\n",
1775                        OLID4(id));
1776                 GOTO(out_dentry, rc = -EINVAL);
1777         }
1778
1779         handle = fsfilt_start(obd, dentry->d_inode,
1780                               FSFILT_OP_SETATTR, NULL);
1781         if (IS_ERR(handle))
1782                 GOTO(out_dentry, rc = PTR_ERR(handle));
1783
1784         rc = mds_update_inode_mid(obd, dentry->d_inode, handle,
1785                                   (struct lustre_id *)data);
1786         if (rc) {
1787                 CERROR("Can't update inode "DLID4" master id, "
1788                        "error = %d.\n", OLID4(id), rc);
1789                 GOTO(out_commit, rc);
1790         }
1791
1792         EXIT;
1793 out_commit:
1794         fsfilt_commit(obd, mds->mds_sb, dentry->d_inode,
1795                       handle, 0);
1796 out_dentry:
1797         l_dput(dentry);
1798 out:
1799         return rc;
1800 }
1801 EXPORT_SYMBOL(mds_update_mid);
1802
1803 /* read master MDS ID, which is stored in local inode EA. */
1804 int mds_read_mid(struct obd_device *obd, struct lustre_id *id,
1805                  void *data, int data_len)
1806 {
1807         struct dentry *dentry;
1808         int rc = 0;
1809         ENTRY;
1810
1811         LASSERT(id);
1812         LASSERT(obd);
1813         
1814         dentry = mds_id2dentry(obd, id, NULL);
1815         if (IS_ERR(dentry))
1816                 GOTO(out, rc = PTR_ERR(dentry));
1817
1818         if (!dentry->d_inode) {
1819                 CERROR("Can't find object "DLID4".\n",
1820                        OLID4(id));
1821                 GOTO(out_dentry, rc = -EINVAL);
1822         }
1823
1824         down(&dentry->d_inode->i_sem);
1825         rc = mds_read_inode_mid(obd, dentry->d_inode,
1826                                 (struct lustre_id *)data);
1827         up(&dentry->d_inode->i_sem);
1828         if (rc) {
1829                 CERROR("Can't read inode "DLID4" master id, "
1830                        "error = %d.\n", OLID4(id), rc);
1831                 GOTO(out_dentry, rc);
1832         }
1833
1834         EXIT;
1835 out_dentry:
1836         l_dput(dentry);
1837 out:
1838         return rc;
1839 }
1840 EXPORT_SYMBOL(mds_read_mid);
1841
1842 int mds_read_md(struct obd_device *obd, struct lustre_id *id, 
1843                 char **data, int *datalen)
1844 {
1845         struct dentry *dentry;
1846         struct mds_obd *mds = &obd->u.mds;
1847         int rc = 0, mea = 0;
1848         char *ea;
1849         ENTRY;
1850
1851         LASSERT(id);
1852         LASSERT(obd);
1853         
1854         dentry = mds_id2dentry(obd, id, NULL);
1855         if (IS_ERR(dentry))
1856                 GOTO(out, rc = PTR_ERR(dentry));
1857
1858         if (!dentry->d_inode) {
1859                 CERROR("Can't find object "DLID4".\n",
1860                        OLID4(id));
1861                 GOTO(out_dentry, rc = -EINVAL);
1862         }
1863         if (S_ISDIR(dentry->d_inode->i_mode)) {
1864                 *datalen = obd_packmd(mds->mds_md_exp, NULL, NULL);
1865                 mea = 1; 
1866         } else {
1867                 *datalen = obd_packmd(mds->mds_dt_exp, NULL, NULL); 
1868                 mea = 0;
1869         }
1870         OBD_ALLOC(ea, *datalen);
1871         if (!ea) {
1872                 *datalen = 0;
1873                 GOTO(out_dentry, rc = PTR_ERR(dentry));
1874         } 
1875         *data = ea;
1876         down(&dentry->d_inode->i_sem);
1877         rc = fsfilt_get_md(obd, dentry->d_inode, *data, *datalen,
1878                            (mea ? EA_MEA : EA_LOV));
1879         up(&dentry->d_inode->i_sem);
1880         
1881         if (rc < 0) 
1882                 CERROR("Error %d reading eadata for ino %lu\n",
1883                         rc, dentry->d_inode->i_ino);
1884 out_dentry:
1885         l_dput(dentry);
1886 out:
1887         RETURN(rc);
1888 }
1889 EXPORT_SYMBOL(mds_read_md);
1890
1891 int mds_reint(struct ptlrpc_request *req, int offset,
1892               struct lustre_handle *lockh)
1893 {
1894         struct mds_update_record *rec;
1895         struct mds_req_sec_desc *rsd;
1896         int rc;
1897         ENTRY;
1898
1899         OBD_ALLOC(rec, sizeof(*rec));
1900         if (rec == NULL)
1901                 RETURN(-ENOMEM);
1902
1903         rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF);
1904         if (!rsd) {
1905                 CERROR("Can't unpack security desc\n");
1906                 GOTO(out, rc = -EFAULT);
1907         }
1908
1909         rc = mds_update_unpack(req, offset, rec);
1910         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1911                 CERROR("invalid record\n");
1912                 GOTO(out, req->rq_status = -EINVAL);
1913         }
1914
1915         rc = mds_init_ucred(&rec->ur_uc, req, rsd);
1916         if (rc) {
1917                 GOTO(out, rc);
1918         }
1919
1920         /* rc will be used to interrupt a for loop over multiple records */
1921         rc = mds_reint_rec(rec, offset, req, lockh);
1922
1923  out:
1924         mds_exit_ucred(&rec->ur_uc);
1925         OBD_FREE(rec, sizeof(*rec));
1926         RETURN(rc);
1927 }
1928
1929 static int mds_filter_recovery_request(struct ptlrpc_request *req,
1930                                        struct obd_device *obd, int *process)
1931 {
1932         switch (req->rq_reqmsg->opc) {
1933         case MDS_CONNECT: /* This will never get here, but for completeness. */
1934         case OST_CONNECT: /* This will never get here, but for completeness. */
1935         case MDS_DISCONNECT:
1936         case OST_DISCONNECT:
1937                *process = 1;
1938                RETURN(0);
1939
1940         case MDS_CLOSE:
1941         case MDS_SYNC: /* used in unmounting */
1942         case OBD_PING:
1943         case MDS_REINT:
1944         case LDLM_ENQUEUE:
1945         case OST_CREATE:
1946                 *process = target_queue_recovery_request(req, obd);
1947                 RETURN(0);
1948
1949         default:
1950                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1951                 *process = 0;
1952                 /* XXX what should we set rq_status to here? */
1953                 req->rq_status = -EAGAIN;
1954                 RETURN(ptlrpc_error(req));
1955         }
1956 }
1957
1958 static char *reint_names[] = {
1959         [REINT_SETATTR] "setattr",
1960         [REINT_CREATE]  "create",
1961         [REINT_LINK]    "link",
1962         [REINT_UNLINK]  "unlink",
1963         [REINT_RENAME]  "rename",
1964         [REINT_OPEN]    "open",
1965 };
1966
1967 #define FILTER_VALID_FLAGS (OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLGENER  | \
1968                             OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ| \
1969                             OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME| \
1970                             OBD_MD_FLID) 
1971
1972 static void reconstruct_create(struct ptlrpc_request *req)
1973 {
1974         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1975         struct mds_client_data *mcd = med->med_mcd;
1976         struct dentry *dentry;
1977         struct ost_body *body;
1978         struct lustre_id id;
1979         int rc;
1980         ENTRY;
1981
1982         /* copy rc, transno and disp; steal locks */
1983         mds_req_from_mcd(req, mcd);
1984         if (req->rq_status) {
1985                 EXIT;
1986                 return;
1987         }
1988
1989         id_gen(&id) = 0;
1990         id_group(&id) = 0;
1991
1992         id_ino(&id) = mcd->mcd_last_data;
1993         LASSERT(id_ino(&id) != 0);
1994
1995         dentry = mds_id2dentry(req2obd(req), &id, NULL);
1996         if (IS_ERR(dentry)) {
1997                 CERROR("can't find inode "LPU64"\n", id_ino(&id));
1998                 req->rq_status = PTR_ERR(dentry);
1999                 EXIT;
2000                 return;
2001         }
2002
2003         CWARN("reconstruct reply for x"LPU64" (remote ino) "LPU64" -> %lu/%u\n",
2004               req->rq_xid, id_ino(&id), dentry->d_inode->i_ino,
2005               dentry->d_inode->i_generation);
2006
2007         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
2008         obdo_from_inode(&body->oa, dentry->d_inode, FILTER_VALID_FLAGS);
2009         body->oa.o_id = dentry->d_inode->i_ino;
2010         body->oa.o_generation = dentry->d_inode->i_generation;
2011         body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
2012
2013         down(&dentry->d_inode->i_sem);
2014         rc = mds_read_inode_sid(req2obd(req), dentry->d_inode, &id);
2015         up(&dentry->d_inode->i_sem);
2016         if (rc) {
2017                 CERROR("Can't read inode self id, inode %lu, "
2018                        "rc %d\n", dentry->d_inode->i_ino, rc);
2019                 id_fid(&id) = 0;
2020         }
2021
2022         body->oa.o_fid = id_fid(&id);
2023         body->oa.o_mds = id_group(&id);
2024         l_dput(dentry);
2025
2026         EXIT;
2027 }
2028
2029 static int mds_inode_init_acl(struct obd_device *obd, void *handle,
2030                               struct dentry *de, void *xattr, int xattr_size)
2031 {
2032         struct inode *inode = de->d_inode;
2033         struct posix_acl *acl;
2034         mode_t mode;
2035         int rc = 0;
2036
2037         LASSERT(handle);
2038         LASSERT(inode);
2039         LASSERT(xattr);
2040         LASSERT(xattr_size > 0);
2041
2042         if (!inode->i_op->getxattr || !inode->i_op->setxattr) {
2043                 CERROR("backend fs dosen't support xattr\n");
2044                 return -EOPNOTSUPP;
2045         }
2046
2047         /* set default acl */
2048         if (S_ISDIR(inode->i_mode)) {
2049                 rc = inode->i_op->setxattr(de, XATTR_NAME_ACL_DEFAULT,
2050                                            xattr, xattr_size, 0);
2051                 if (rc) {
2052                         CERROR("set default acl err: %d\n", rc);
2053                         return rc;
2054                 }
2055         }
2056
2057         /* set access acl */
2058         acl = posix_acl_from_xattr(xattr, xattr_size);
2059         if (acl == NULL || IS_ERR(acl)) {
2060                 CERROR("insane attr data\n");
2061                 return PTR_ERR(acl);
2062         }
2063
2064         if (posix_acl_valid(acl)) {
2065                 CERROR("default acl not valid: %d\n", rc);
2066                 rc = -EFAULT;
2067                 goto out;
2068         }
2069
2070         mode = inode->i_mode;
2071         rc = posix_acl_create_masq(acl, &mode);
2072         if (rc < 0) {
2073                 CERROR("create masq err %d\n", rc);
2074                 goto out;
2075         }
2076
2077         if (inode->i_mode != mode) {
2078                 struct iattr iattr = { .ia_valid = ATTR_MODE,
2079                                        .ia_mode = mode };
2080                 int rc2;
2081
2082                 rc2 = fsfilt_setattr(obd, de, handle, &iattr, 0);
2083                 if (rc2) {
2084                         CERROR("setattr mode err: %d\n", rc2);
2085                         rc = rc2;
2086                         goto out;
2087                 }
2088         }
2089
2090         if (rc > 0) {
2091                 /* we didn't change acl except mode bits of some
2092                  * entries, so should be fit into original size.
2093                  */
2094                 rc = posix_acl_to_xattr(acl, xattr, xattr_size);
2095                 LASSERT(rc > 0);
2096
2097                 rc = inode->i_op->setxattr(de, XATTR_NAME_ACL_ACCESS,
2098                                            xattr, xattr_size, 0);
2099                 if (rc)
2100                         CERROR("set access acl err: %d\n", rc);
2101         }
2102 out:
2103         posix_acl_release(acl);
2104         return rc;
2105 }
2106
2107 static int mdt_obj_create(struct ptlrpc_request *req)
2108 {
2109         struct obd_device *obd = req->rq_export->exp_obd;
2110         struct mds_obd *mds = &obd->u.mds;
2111         struct ost_body *body, *repbody;
2112         void *acl = NULL;
2113         int acl_size;
2114         char idname[LL_ID_NAMELEN];
2115         int size = sizeof(*repbody);
2116         struct inode *parent_inode;
2117         struct lvfs_run_ctxt saved;
2118         int rc, cleanup_phase = 0;
2119         struct dentry *new = NULL;
2120         struct dentry_params dp;
2121         int mealen, flags = 0;
2122         struct lvfs_ucred uc;
2123         struct lustre_id id;
2124         struct mea *mea;
2125         void *handle = NULL;
2126         unsigned long cr_inum = 0;
2127         ENTRY;
2128        
2129         DEBUG_REQ(D_HA, req, "create remote object");
2130         parent_inode = mds->mds_unnamed_dir->d_inode;
2131
2132         body = lustre_swab_reqbuf(req, 0, sizeof(*body),
2133                                   lustre_swab_ost_body);
2134         if (body == NULL)
2135                 RETURN(-EFAULT);
2136
2137         /* acl data is packed transparently, no swab here */
2138         LASSERT(req->rq_reqmsg->bufcount >= 2);
2139         acl_size = req->rq_reqmsg->buflens[1];
2140         if (acl_size) {
2141                 acl = lustre_msg_buf(req->rq_reqmsg, 1, acl_size);
2142                 if (!acl) {
2143                         CERROR("No default acl buf?\n");
2144                         RETURN(-EFAULT);
2145                 }
2146         }
2147
2148         rc = lustre_pack_reply(req, 1, &size, NULL);
2149         if (rc)
2150                 RETURN(rc);
2151
2152         MDS_CHECK_RESENT(req, reconstruct_create(req));
2153
2154         uc.luc_lsd = NULL;
2155         uc.luc_ginfo = NULL;
2156         uc.luc_uid = body->oa.o_uid;
2157         uc.luc_gid = body->oa.o_gid;
2158         uc.luc_fsuid = body->oa.o_uid;
2159         uc.luc_fsgid = body->oa.o_gid;
2160
2161         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
2162         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
2163
2164         /* in REPLAY case inum should be given (client or other MDS fills it) */
2165         if (body->oa.o_id && ((body->oa.o_flags & OBD_FL_RECREATE_OBJS) ||
2166             (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))) {
2167                 /*
2168                  * this is re-create request from MDS holding directory name.
2169                  * we have to lookup given ino/gen first. if it exists (good
2170                  * case) then there is nothing to do. if it does not then we
2171                  * have to recreate it.
2172                  */
2173                 id_ino(&id) = body->oa.o_id;
2174                 id_gen(&id) = body->oa.o_generation;
2175  
2176                 new = mds_id2dentry(obd, &id, NULL);
2177                 if (!IS_ERR(new) && new->d_inode) {
2178                         struct lustre_id sid;
2179                                 
2180                         CWARN("mkdir() repairing is on its way: %lu/%lu\n",
2181                               (unsigned long)id_ino(&id), (unsigned long)id_gen(&id));
2182                         
2183                         obdo_from_inode(&repbody->oa, new->d_inode,
2184                                         FILTER_VALID_FLAGS);
2185                         
2186                         repbody->oa.o_id = new->d_inode->i_ino;
2187                         repbody->oa.o_generation = new->d_inode->i_generation;
2188                         repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
2189                         cleanup_phase = 1;
2190
2191                         down(&new->d_inode->i_sem);
2192                         rc = mds_read_inode_sid(obd, new->d_inode, &sid);
2193                         up(&new->d_inode->i_sem);
2194                         if (rc) {
2195                                 CERROR("Can't read inode self id "
2196                                        "inode %lu, rc %d.\n",
2197                                        new->d_inode->i_ino, rc);
2198                                 GOTO(cleanup, rc);
2199                         }
2200
2201                         repbody->oa.o_fid = id_fid(&sid);
2202                         repbody->oa.o_mds = id_group(&sid);
2203                         LASSERT(id_fid(&sid) != 0);
2204
2205                         /* 
2206                          * here we could use fid passed in body->oa.o_fid and
2207                          * thus avoid mds_read_inode_sid().
2208                          */
2209                         cr_inum = new->d_inode->i_ino;
2210                         GOTO(cleanup, rc = 0);
2211                 }
2212         }
2213         
2214         down(&parent_inode->i_sem);
2215         handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
2216         if (IS_ERR(handle)) {
2217                 up(&parent_inode->i_sem);
2218                 CERROR("fsfilt_start() failed, rc = %d\n",
2219                        (int)PTR_ERR(handle));
2220                 GOTO(cleanup, rc = PTR_ERR(handle));
2221         }
2222         cleanup_phase = 1; /* transaction */
2223
2224 repeat:
2225         rc = sprintf(idname, "%u.%u", ll_insecure_random_int(), current->pid);
2226         new = lookup_one_len(idname, mds->mds_unnamed_dir, rc);
2227         if (IS_ERR(new)) {
2228                 CERROR("%s: can't lookup new inode (%s) for mkdir: %d\n",
2229                        obd->obd_name, idname, (int) PTR_ERR(new));
2230                 fsfilt_commit(obd, mds->mds_sb, new->d_inode, handle, 0);
2231                 up(&parent_inode->i_sem);
2232                 RETURN(PTR_ERR(new));
2233         } else if (new->d_inode) {
2234                 CERROR("%s: name exists. repeat\n", obd->obd_name);
2235                 goto repeat;
2236         }
2237
2238         new->d_fsdata = (void *)&dp;
2239         dp.p_inum = 0;
2240         dp.p_ptr = req;
2241
2242         if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) ||
2243             (body->oa.o_flags & OBD_FL_RECREATE_OBJS)) {
2244                 LASSERT(body->oa.o_id != 0);
2245                 dp.p_inum = body->oa.o_id;
2246                 DEBUG_REQ(D_HA, req, "replay create obj %lu/%lu",
2247                           (unsigned long)body->oa.o_id,
2248                           (unsigned long)body->oa.o_generation);
2249         }
2250
2251         rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
2252         if (rc == 0) {
2253                 if (acl) {
2254                         rc = mds_inode_init_acl(obd, handle, new,
2255                                                 acl, acl_size);
2256                         if (rc) {
2257                                 up(&parent_inode->i_sem);
2258                                 GOTO(cleanup, rc);
2259                         }
2260                 }
2261                 if ((body->oa.o_flags & OBD_FL_RECREATE_OBJS) ||
2262                     lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
2263                         new->d_inode->i_generation = body->oa.o_generation;
2264                         mark_inode_dirty(new->d_inode);
2265                         
2266                         /*
2267                          * avoiding asserts in cache flush case, as
2268                          * @body->oa.o_id should be zero.
2269                          */
2270                         if (body->oa.o_id) {
2271                                 LASSERTF(body->oa.o_id == new->d_inode->i_ino, 
2272                                          "BUG 3550: failed to recreate obj "
2273                                          LPU64" -> %lu\n", body->oa.o_id,
2274                                          new->d_inode->i_ino);
2275                                 
2276                                 LASSERTF(body->oa.o_generation == 
2277                                          new->d_inode->i_generation,
2278                                          "BUG 3550: failed to recreate obj/gen "
2279                                          LPU64"/%u -> %lu/%u\n", body->oa.o_id,
2280                                          body->oa.o_generation,
2281                                          new->d_inode->i_ino, 
2282                                          new->d_inode->i_generation);
2283                         }
2284                 }
2285                 
2286                 obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
2287                 repbody->oa.o_id = new->d_inode->i_ino;
2288                 repbody->oa.o_generation = new->d_inode->i_generation;
2289                 repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FID;
2290
2291                 if ((body->oa.o_flags & OBD_FL_RECREATE_OBJS) ||
2292                     lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
2293                         id_group(&id) = mds->mds_num;
2294                 
2295                         LASSERT(body->oa.o_fid != 0);
2296                         id_fid(&id) = body->oa.o_fid;
2297
2298                         LASSERT(body->oa.o_id != 0);
2299                         id_ino(&id) = repbody->oa.o_id;
2300                         id_gen(&id) = repbody->oa.o_generation;
2301                 
2302                         down(&new->d_inode->i_sem);
2303                         rc = mds_update_inode_sid(obd, new->d_inode, handle, &id);
2304                         up(&new->d_inode->i_sem);
2305
2306                         /* 
2307                          * make sure, that fid is up-to-date.
2308                          */
2309                         mds_set_last_fid(obd, id_fid(&id));
2310                 } else {
2311                         /*
2312                          * allocate new sid, as object is created from scratch
2313                          * and this is not replay.
2314                          */
2315                         down(&new->d_inode->i_sem);
2316                         rc = mds_alloc_inode_sid(obd, new->d_inode, handle, &id);
2317                         up(&new->d_inode->i_sem);
2318                 }
2319                 if (rc) {
2320                         CERROR("Can't update lustre ID for inode %lu, "
2321                                "error = %d\n", new->d_inode->i_ino, rc);
2322                         GOTO(cleanup, rc);
2323                 }
2324
2325                 /* initializing o_fid after it is allocated. */
2326                 repbody->oa.o_fid = id_fid(&id);
2327                 repbody->oa.o_mds = id_group(&id);
2328
2329                 rc = fsfilt_del_dir_entry(obd, new);
2330                 up(&parent_inode->i_sem);
2331                 if (rc) {
2332                         CERROR("can't remove name for object: %d\n", rc);
2333                         GOTO(cleanup, rc);
2334                 }
2335                 
2336                 cleanup_phase = 2; /* created directory object */
2337
2338                 CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
2339                        (unsigned long)new->d_inode->i_ino,
2340                        (unsigned long)new->d_inode->i_generation,
2341                        (unsigned)new->d_inode->i_mode);
2342                 cr_inum = new->d_inode->i_ino;
2343         } else {
2344                 up(&parent_inode->i_sem);
2345                 CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc);
2346                 GOTO(cleanup, rc);
2347         }
2348
2349         if (body->oa.o_valid & OBD_MD_FLID) {
2350                 /* this is new object for splitted dir. We have to prevent
2351                  * recursive splitting on it -bzzz */
2352                 mealen = obd_size_diskmd(mds->mds_md_exp, NULL);
2353
2354                 OBD_ALLOC(mea, mealen);
2355                 if (mea == NULL)
2356                         GOTO(cleanup, rc = -ENOMEM);
2357
2358                 mea->mea_magic = MEA_MAGIC_ALL_CHARS;
2359                 mea->mea_master = 0;
2360                 mea->mea_count = 0;
2361
2362                 down(&new->d_inode->i_sem);
2363                 rc = fsfilt_set_md(obd, new->d_inode, handle,
2364                                    mea, mealen, EA_MEA);
2365                 up(&new->d_inode->i_sem);
2366                 if (rc)
2367                         CERROR("fsfilt_set_md() failed, "
2368                                "rc = %d\n", rc);
2369
2370                 OBD_FREE(mea, mealen);
2371                 
2372                 CDEBUG(D_OTHER, "%s: mark non-splittable %lu/%u - %d\n",
2373                        obd->obd_name, new->d_inode->i_ino,
2374                        new->d_inode->i_generation, flags);
2375         } else if (body->oa.o_easize) {
2376                 /* we pass LCK_EX to split routine to signal that we have
2377                  * exclusive access to the directory. simple because nobody
2378                  * knows it already exists -bzzz */
2379                 rc = mds_try_to_split_dir(obd, new, NULL,
2380                                           body->oa.o_easize, LCK_EX);
2381                 if (rc < 0) {
2382                         CERROR("Can't split directory %lu, error = %d.\n",
2383                                new->d_inode->i_ino, rc);
2384                 } else {
2385                         rc = 0;
2386                 }
2387         }
2388
2389         EXIT;
2390 cleanup:
2391         switch (cleanup_phase) {
2392         case 2: /* object has been created, but we'll may want to replay it later */
2393                 if (rc == 0)
2394                         ptlrpc_require_repack(req);
2395         case 1: /* transaction */
2396                 rc = mds_finish_transno(mds, parent_inode, handle,
2397                                         req, rc, cr_inum);
2398         }
2399
2400         l_dput(new);
2401         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
2402         return rc;
2403 }
2404
2405 static int mdt_get_info(struct ptlrpc_request *req)
2406 {
2407         struct obd_export *exp = req->rq_export;
2408         int keylen, rc = 0;
2409         char *key;
2410         ENTRY;
2411
2412         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
2413         if (key == NULL) {
2414                 DEBUG_REQ(D_HA, req, "no get_info key");
2415                 RETURN(-EFAULT);
2416         }
2417         keylen = req->rq_reqmsg->buflens[0];
2418
2419         if ((keylen < strlen("mdsize") || strcmp(key, "mdsize") != 0) &&
2420             (keylen < strlen("mdsnum") || strcmp(key, "mdsnum") != 0) &&
2421             (keylen < strlen("rootid") || strcmp(key, "rootid") != 0))
2422                 RETURN(-EPROTO);
2423
2424         if (keylen >= strlen("rootid") && !strcmp(key, "rootid")) {
2425                 struct lustre_id *reply;
2426                 int size = sizeof(*reply);
2427                 
2428                 rc = lustre_pack_reply(req, 1, &size, NULL);
2429                 if (rc)
2430                         RETURN(rc);
2431
2432                 reply = lustre_msg_buf(req->rq_repmsg, 0, size);
2433                 rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
2434         } else {
2435                 obd_id *reply;
2436                 int size = sizeof(*reply);
2437                 
2438                 rc = lustre_pack_reply(req, 1, &size, NULL);
2439                 if (rc)
2440                         RETURN(rc);
2441
2442                 reply = lustre_msg_buf(req->rq_repmsg, 0, size);
2443                 rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply);
2444         }
2445
2446         req->rq_repmsg->status = 0;
2447         RETURN(rc);
2448 }
2449
2450 static int mds_set_info(struct obd_export *exp, __u32 keylen,
2451                         void *key, __u32 vallen, void *val)
2452 {
2453         struct obd_device *obd;
2454         struct mds_obd *mds;
2455         int rc = 0;
2456         ENTRY;
2457
2458         obd = class_exp2obd(exp);
2459         if (obd == NULL) {
2460                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2461                        exp->exp_handle.h_cookie);
2462                 RETURN(-EINVAL);
2463         }
2464
2465         mds = &obd->u.mds;
2466         if (keylen >= strlen("mds_type") &&
2467              memcmp(key, "mds_type", keylen) == 0) {
2468                 int valsize;
2469                 __u32 group;
2470                 
2471                 CDEBUG(D_IOCTL, "set mds type to %x\n", *(int*)val);
2472                 
2473                 mds->mds_obd_type = *(int*)val;
2474                 group = FILTER_GROUP_FIRST_MDS + mds->mds_obd_type;
2475                 valsize = sizeof(group);
2476                 
2477                 /* mds number has been changed, so the corresponding obdfilter
2478                  * exp need to be changed too. */
2479                 rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"),
2480                                   "mds_conn", valsize, &group);
2481                 RETURN(rc);
2482         }
2483         CDEBUG(D_IOCTL, "invalid key\n");
2484         RETURN(-EINVAL);
2485 }
2486
2487 static int mdt_set_info(struct ptlrpc_request *req)
2488 {
2489         char *key, *val;
2490         struct obd_export *exp = req->rq_export;
2491         int keylen, rc = 0, vallen;
2492         ENTRY;
2493
2494         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
2495         if (key == NULL) {
2496                 DEBUG_REQ(D_HA, req, "no set_info key");
2497                 RETURN(-EFAULT);
2498         }
2499         keylen = req->rq_reqmsg->buflens[0];
2500
2501         if (keylen == strlen("mds_type") &&
2502             memcmp(key, "mds_type", keylen) == 0) {
2503                 rc = lustre_pack_reply(req, 0, NULL, NULL);
2504                 if (rc)
2505                         RETURN(rc);
2506                 
2507                 val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
2508                 vallen = req->rq_reqmsg->buflens[1];
2509
2510                 rc = obd_set_info(exp, keylen, key, vallen, val);
2511                 req->rq_repmsg->status = 0;
2512                 RETURN(rc);
2513         }
2514         CDEBUG(D_IOCTL, "invalid key\n");
2515         RETURN(-EINVAL);
2516 }
2517
2518 static void mds_revoke_export_locks(struct obd_export *exp)
2519 {
2520         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
2521         struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks;
2522         struct ldlm_lock *lock, *next;
2523         struct ldlm_lock_desc desc;
2524
2525         if (!exp->u.eu_mds_data.med_remote)
2526                 return;
2527
2528         ENTRY;
2529         l_lock(&ns->ns_lock);
2530         list_for_each_entry_safe(lock, next, locklist, l_export_chain) {
2531                 if (lock->l_req_mode != lock->l_granted_mode)
2532                         continue;
2533
2534                 LASSERT(lock->l_resource);
2535                 if (lock->l_resource->lr_type != LDLM_IBITS &&
2536                     lock->l_resource->lr_type != LDLM_PLAIN)
2537                         continue;
2538
2539                 if (lock->l_flags & LDLM_FL_AST_SENT)
2540                         continue;
2541
2542                 lock->l_flags |= LDLM_FL_AST_SENT;
2543
2544                 /* the desc just pretend to exclusive */
2545                 ldlm_lock2desc(lock, &desc);
2546                 desc.l_req_mode = LCK_EX;
2547                 desc.l_granted_mode = 0;
2548
2549                 lock->l_blocking_ast(lock, &desc, NULL, LDLM_CB_BLOCKING);
2550         }
2551         l_unlock(&ns->ns_lock);
2552         EXIT;
2553 }
2554
2555 static int mds_msg_check_version(struct lustre_msg *msg)
2556 {
2557         int rc;
2558
2559         switch (msg->opc) {
2560         case MDS_CONNECT:
2561         case MDS_DISCONNECT:
2562         case OBD_PING:
2563                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
2564                 if (rc)
2565                         CERROR("bad opc %u version %08x, expecting %08x\n",
2566                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
2567                 break;
2568         case MDS_STATFS:
2569         case MDS_GETSTATUS:
2570         case MDS_GETATTR:
2571         case MDS_GETATTR_LOCK:
2572         case MDS_READPAGE:
2573         case MDS_REINT:
2574         case MDS_CLOSE:
2575         case MDS_DONE_WRITING:
2576         case MDS_PIN:
2577         case MDS_SYNC:
2578                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
2579                 if (rc)
2580                         CERROR("bad opc %u version %08x, expecting %08x\n",
2581                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
2582                 break;
2583         case LDLM_ENQUEUE:
2584         case LDLM_CONVERT:
2585         case LDLM_BL_CALLBACK:
2586         case LDLM_CP_CALLBACK:
2587                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
2588                 if (rc)
2589                         CERROR("bad opc %u version %08x, expecting %08x\n",
2590                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
2591                 break;
2592         case OBD_LOG_CANCEL:
2593         case LLOG_ORIGIN_HANDLE_OPEN:
2594         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
2595         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
2596         case LLOG_ORIGIN_HANDLE_READ_HEADER:
2597         case LLOG_ORIGIN_HANDLE_CLOSE:
2598         case LLOG_CATINFO:
2599                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
2600                 if (rc)
2601                         CERROR("bad opc %u version %08x, expecting %08x\n",
2602                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
2603                 break;
2604         case OST_CREATE:
2605         case OST_WRITE:
2606         case OST_GET_INFO:
2607         case OST_SET_INFO:
2608                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
2609                 if (rc)
2610                         CERROR("bad opc %u version %08x, expecting %08x\n",
2611                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
2612                 break;
2613         case SEC_INIT:
2614         case SEC_INIT_CONTINUE:
2615         case SEC_FINI:
2616                 rc = 0;
2617                 break;
2618         default:
2619                 CERROR("MDS unknown opcode %d\n", msg->opc);
2620                 rc = -ENOTSUPP;
2621                 break;
2622         }
2623
2624         return rc;
2625 }
2626
2627 int mds_handle(struct ptlrpc_request *req)
2628 {
2629         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
2630         struct obd_device *obd = NULL;
2631         struct mds_obd *mds = NULL; /* quell gcc overwarning */
2632         int rc = 0;
2633         ENTRY;
2634
2635         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
2636
2637         rc = mds_msg_check_version(req->rq_reqmsg);
2638         if (rc) {
2639                 CERROR("MDS drop mal-formed request\n");
2640                 RETURN(rc);
2641         }
2642
2643         /* Security opc should NOT trigger any recovery events */
2644         if (req->rq_reqmsg->opc == SEC_INIT ||
2645             req->rq_reqmsg->opc == SEC_INIT_CONTINUE) {
2646                 if (req->rq_export) {
2647                         mds_req_add_idmapping(req,
2648                                               &req->rq_export->exp_mds_data);
2649                         mds_revoke_export_locks(req->rq_export);
2650                 }
2651                 GOTO(out, rc = 0);
2652         } else if (req->rq_reqmsg->opc == SEC_FINI) {
2653                 if (req->rq_export) {
2654                         mds_req_del_idmapping(req,
2655                                               &req->rq_export->exp_mds_data);
2656                         mds_revoke_export_locks(req->rq_export);
2657                 }
2658                 GOTO(out, rc = 0);
2659         }
2660
2661         LASSERT(current->journal_info == NULL);
2662         /* XXX identical to OST */
2663         if (req->rq_reqmsg->opc != MDS_CONNECT) {
2664                 struct mds_export_data *med;
2665                 int recovering;
2666
2667                 if (req->rq_export == NULL) {
2668                         CERROR("operation %d on unconnected MDS from %s\n",
2669                                req->rq_reqmsg->opc,
2670                                req->rq_peerstr);
2671                         req->rq_status = -ENOTCONN;
2672                         GOTO(out, rc = -ENOTCONN);
2673                 }
2674
2675                 med = &req->rq_export->exp_mds_data;
2676                 obd = req->rq_export->exp_obd;
2677                 mds = &obd->u.mds;
2678
2679                 /* sanity check: if the xid matches, the request must
2680                  * be marked as a resent or replayed */
2681                 if (req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_xid) ||
2682                    req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_close_xid)) {
2683                         LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
2684                                  (MSG_RESENT | MSG_REPLAY),
2685                                  "rq_xid "LPU64" matches last_xid, "
2686                                  "expected RESENT flag\n",
2687                                  req->rq_xid);
2688                 }
2689                 /* else: note the opposite is not always true; a
2690                  * RESENT req after a failover will usually not match
2691                  * the last_xid, since it was likely never
2692                  * committed. A REPLAYed request will almost never
2693                  * match the last xid, however it could for a
2694                  * committed, but still retained, open. */
2695
2696                 spin_lock_bh(&obd->obd_processing_task_lock);
2697                 recovering = obd->obd_recovering;
2698                 spin_unlock_bh(&obd->obd_processing_task_lock);
2699                 if (recovering) {
2700                         rc = mds_filter_recovery_request(req, obd,
2701                                                          &should_process);
2702                         if (rc || should_process == 0) {
2703                                 RETURN(rc);
2704                         } else if (should_process < 0) {
2705                                 req->rq_status = should_process;
2706                                 rc = ptlrpc_error(req);
2707                                 RETURN(rc);
2708                         }
2709                 }
2710         }
2711
2712         switch (req->rq_reqmsg->opc) {
2713         case MDS_CONNECT:
2714                 DEBUG_REQ(D_INODE, req, "connect");
2715                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
2716                 rc = target_handle_connect(req);
2717                 if (!rc) {
2718                         struct mds_export_data *med;
2719
2720                         LASSERT(req->rq_export);
2721                         med = &req->rq_export->u.eu_mds_data;
2722                         mds_init_export_data(req, med);
2723                         mds_req_add_idmapping(req, med);
2724
2725                         /* Now that we have an export, set mds. */
2726                         obd = req->rq_export->exp_obd;
2727                         mds = mds_req2mds(req);
2728                 }
2729                 break;
2730
2731         case MDS_DISCONNECT:
2732                 DEBUG_REQ(D_INODE, req, "disconnect");
2733                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
2734                 rc = target_handle_disconnect(req);
2735                 req->rq_status = rc;            /* superfluous? */
2736                 break;
2737
2738         case MDS_GETSTATUS:
2739                 DEBUG_REQ(D_INODE, req, "getstatus");
2740                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
2741                 rc = mds_getstatus(req);
2742                 break;
2743
2744         case MDS_GETATTR:
2745                 DEBUG_REQ(D_INODE, req, "getattr");
2746                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
2747                 rc = mds_getattr(req, MDS_REQ_REC_OFF);
2748                 break;
2749
2750         case MDS_GETATTR_LOCK: {
2751                 struct lustre_handle lockh;
2752                 DEBUG_REQ(D_INODE, req, "getattr_lock");
2753                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_LOCK_NET, 0);
2754
2755                 /* If this request gets a reconstructed reply, we won't be
2756                  * acquiring any new locks in mds_getattr_lock, so we don't
2757                  * want to cancel.
2758                  */
2759                 lockh.cookie = 0;
2760                 rc = mds_getattr_lock(req, MDS_REQ_REC_OFF, &lockh,
2761                                       MDS_INODELOCK_UPDATE);
2762                 /* this non-intent call (from an ioctl) is special */
2763                 req->rq_status = rc;
2764                 if (rc == 0 && lockh.cookie)
2765                         ldlm_lock_decref(&lockh, LCK_PR);
2766                 break;
2767         }
2768         case MDS_STATFS:
2769                 DEBUG_REQ(D_INODE, req, "statfs");
2770                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
2771                 rc = mds_statfs(req);
2772                 break;
2773
2774         case MDS_READPAGE:
2775                 DEBUG_REQ(D_INODE, req, "readpage");
2776                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
2777                 rc = mds_readpage(req, MDS_REQ_REC_OFF);
2778
2779                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
2780                         if (req->rq_reply_state) {
2781                                 lustre_free_reply_state (req->rq_reply_state);
2782                                 req->rq_reply_state = NULL;
2783                         }
2784                         RETURN(0);
2785                 }
2786
2787                 break;
2788         case MDS_REINT: {
2789                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
2790                                              sizeof (*opcp));
2791                 __u32  opc;
2792                 int size[3] = {sizeof(struct mds_body), mds->mds_max_mdsize,
2793                                mds->mds_max_cookiesize};
2794                 int bufcount;
2795
2796                 /* NB only peek inside req now; mds_reint() will swab it */
2797                 if (opcp == NULL) {
2798                         CERROR ("Can't inspect opcode\n");
2799                         rc = -EINVAL;
2800                         break;
2801                 }
2802                 opc = *opcp;
2803                 if (lustre_msg_swabbed (req->rq_reqmsg))
2804                         __swab32s(&opc);
2805
2806                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
2807                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
2808                            reint_names[opc] == NULL) ? reint_names[opc] :
2809                                                        "unknown opcode");
2810
2811                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
2812
2813                 if (opc == REINT_UNLINK || opc == REINT_RENAME)
2814                         bufcount = 3;
2815                 else if (opc == REINT_OPEN)
2816                         bufcount = 2;
2817                 else
2818                         bufcount = 1;
2819
2820                 rc = lustre_pack_reply(req, bufcount, size, NULL);
2821                 if (rc)
2822                         break;
2823
2824                 rc = mds_reint(req, MDS_REQ_REC_OFF, NULL);
2825                 fail = OBD_FAIL_MDS_REINT_NET_REP;
2826                 break;
2827         }
2828
2829         case MDS_CLOSE:
2830                 DEBUG_REQ(D_INODE, req, "close");
2831                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
2832                 rc = mds_close(req, MDS_REQ_REC_OFF);
2833                 break;
2834
2835         case MDS_DONE_WRITING:
2836                 DEBUG_REQ(D_INODE, req, "done_writing");
2837                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
2838                 rc = mds_done_writing(req, MDS_REQ_REC_OFF);
2839                 break;
2840
2841         case MDS_PIN:
2842                 DEBUG_REQ(D_INODE, req, "pin");
2843                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
2844                 rc = mds_pin(req, MDS_REQ_REC_OFF);
2845                 break;
2846
2847         case MDS_SYNC:
2848                 DEBUG_REQ(D_INODE, req, "sync");
2849                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
2850                 rc = mds_sync(req, MDS_REQ_REC_OFF);
2851                 break;
2852
2853         case OBD_PING:
2854                 DEBUG_REQ(D_INODE, req, "ping");
2855                 rc = target_handle_ping(req);
2856                 break;
2857
2858         case OBD_LOG_CANCEL:
2859                 CDEBUG(D_INODE, "log cancel\n");
2860                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
2861                 rc = -ENOTSUPP; /* la la la */
2862                 break;
2863
2864         case LDLM_ENQUEUE:
2865                 DEBUG_REQ(D_INODE, req, "enqueue");
2866                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
2867                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
2868                                          ldlm_server_blocking_ast, NULL);
2869                 fail = OBD_FAIL_LDLM_REPLY;
2870                 break;
2871         case LDLM_CONVERT:
2872                 DEBUG_REQ(D_INODE, req, "convert");
2873                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
2874                 rc = ldlm_handle_convert(req);
2875                 break;
2876         case LDLM_BL_CALLBACK:
2877         case LDLM_CP_CALLBACK:
2878                 DEBUG_REQ(D_INODE, req, "callback");
2879                 CERROR("callbacks should not happen on MDS\n");
2880                 LBUG();
2881                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
2882                 break;
2883         case LLOG_ORIGIN_HANDLE_OPEN:
2884                 DEBUG_REQ(D_INODE, req, "llog_init");
2885                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
2886                 rc = llog_origin_handle_open(req);
2887                 break;
2888         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
2889                 DEBUG_REQ(D_INODE, req, "llog next block");
2890                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
2891                 rc = llog_origin_handle_next_block(req);
2892                 break;
2893         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
2894                 DEBUG_REQ(D_INODE, req, "llog prev block");
2895                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
2896                 rc = llog_origin_handle_prev_block(req);
2897                 break;
2898         case LLOG_ORIGIN_HANDLE_READ_HEADER:
2899                 DEBUG_REQ(D_INODE, req, "llog read header");
2900                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
2901                 rc = llog_origin_handle_read_header(req);
2902                 break;
2903         case LLOG_ORIGIN_HANDLE_CLOSE:
2904                 DEBUG_REQ(D_INODE, req, "llog close");
2905                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
2906                 rc = llog_origin_handle_close(req);
2907                 break;
2908         case OST_CREATE:
2909                 DEBUG_REQ(D_INODE, req, "ost_create");
2910                 rc = mdt_obj_create(req);
2911                 break;
2912         case OST_GET_INFO:
2913                 DEBUG_REQ(D_INODE, req, "get_info");
2914                 rc = mdt_get_info(req);
2915                 break;
2916         case OST_SET_INFO:
2917                 DEBUG_REQ(D_INODE, req, "set_info");
2918                 rc = mdt_set_info(req);
2919                 break;
2920         case OST_WRITE:
2921                 CDEBUG(D_INODE, "write\n");
2922                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
2923                 rc = ost_brw_write(req, NULL);
2924                 LASSERT(current->journal_info == NULL);
2925                 /* mdt_brw sends its own replies */
2926                 RETURN(rc);
2927                 break;
2928         case LLOG_CATINFO:
2929                 DEBUG_REQ(D_INODE, req, "llog catinfo");
2930                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
2931                 rc = llog_catinfo(req);
2932                 break;
2933         default:
2934                 req->rq_status = -ENOTSUPP;
2935                 rc = ptlrpc_error(req);
2936                 RETURN(rc);
2937         }
2938
2939         LASSERT(current->journal_info == NULL);
2940
2941         EXIT;
2942
2943         /* If we're DISCONNECTing, the mds_export_data is already freed */
2944         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
2945                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
2946                 struct obd_device *obd = list_entry(mds, struct obd_device,
2947                                                     u.mds);
2948                 req->rq_repmsg->last_xid =
2949                         le64_to_cpu(med->med_mcd->mcd_last_xid);
2950
2951                 if (!obd->obd_no_transno) {
2952                         req->rq_repmsg->last_committed =
2953                                 obd->obd_last_committed;
2954                 } else {
2955                         DEBUG_REQ(D_IOCTL, req,
2956                                   "not sending last_committed update");
2957                 }
2958                 CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
2959                        ", xid "LPU64"\n",
2960                        mds->mds_last_transno, obd->obd_last_committed,
2961                        req->rq_xid);
2962         }
2963  out:
2964
2965
2966         target_send_reply(req, rc, fail);
2967         return 0;
2968 }
2969
2970 /* Update the server data on disk.  This stores the new mount_count and also the
2971  * last_rcvd value to disk.  If we don't have a clean shutdown, then the server
2972  * last_rcvd value may be less than that of the clients.  This will alert us
2973  * that we may need to do client recovery.
2974  *
2975  * Also assumes for mds_last_transno that we are not modifying it (no locking).
2976  */
2977 int mds_update_server_data(struct obd_device *obd, int force_sync)
2978 {
2979         struct mds_obd *mds = &obd->u.mds;
2980         struct mds_server_data *msd = mds->mds_server_data;
2981         struct file *filp = mds->mds_rcvd_filp;
2982         struct lvfs_run_ctxt saved;
2983         loff_t off = 0;
2984         int rc;
2985         ENTRY;
2986
2987         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2988         msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno);
2989
2990         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
2991                mds->mds_mount_count, mds->mds_last_transno);
2992         rc = fsfilt_write_record(obd, filp, msd, sizeof(*msd), &off, force_sync);
2993         if (rc)
2994                 CERROR("error writing MDS server data: rc = %d\n", rc);
2995         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2996
2997         RETURN(rc);
2998 }
2999
3000 /* saves last allocated fid counter to file. */
3001 int mds_update_last_fid(struct obd_device *obd, void *handle,
3002                         int force_sync)
3003 {
3004         struct mds_obd *mds = &obd->u.mds;
3005         struct file *filp = mds->mds_fid_filp;
3006         struct lvfs_run_ctxt saved;
3007         loff_t off = 0;
3008         __u64 last_fid;
3009         int rc = 0;
3010         ENTRY;
3011
3012         spin_lock(&mds->mds_last_fid_lock);
3013         last_fid = mds->mds_last_fid;
3014         spin_unlock(&mds->mds_last_fid_lock);
3015
3016         CDEBUG(D_SUPER, "MDS last_fid is #"LPU64"\n",
3017                last_fid);
3018
3019         if (handle) {
3020                 fsfilt_add_journal_cb(obd, mds->mds_sb, last_fid,
3021                                       handle, mds_commit_last_fid_cb,
3022                                       NULL);
3023         }
3024                 
3025         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3026         rc = fsfilt_write_record(obd, filp, &last_fid, sizeof(last_fid),
3027                                  &off, force_sync);
3028         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3029
3030         if (rc) {
3031                 CERROR("error writing MDS last_fid #"LPU64
3032                        ", err = %d\n", last_fid, rc);
3033                 RETURN(rc);
3034         }
3035                 
3036         CDEBUG(D_SUPER, "wrote fid #"LPU64" at idx "
3037                "%llu: err = %d\n", last_fid, off, rc);
3038
3039         RETURN(rc);
3040 }
3041
3042 void mds_set_last_fid(struct obd_device *obd, __u64 fid)
3043 {
3044         struct mds_obd *mds = &obd->u.mds;
3045
3046         spin_lock(&mds->mds_last_fid_lock);
3047         if (fid > mds->mds_last_fid)
3048                 mds->mds_last_fid = fid;
3049         spin_unlock(&mds->mds_last_fid_lock);
3050 }
3051
3052 void mds_commit_last_transno_cb(struct obd_device *obd,
3053                                 __u64 transno, void *data,
3054                                 int error)
3055 {
3056         obd_transno_commit_cb(obd, transno, error);
3057 }
3058
3059 void mds_commit_last_fid_cb(struct obd_device *obd,
3060                             __u64 fid, void *data,
3061                             int error)
3062 {
3063         if (error) {
3064                 CERROR("%s: fid "LPD64" commit error: %d\n",
3065                        obd->obd_name, fid, error);
3066                 return;
3067         }
3068         
3069         CDEBUG(D_HA, "%s: fid "LPD64" committed\n",
3070                obd->obd_name, fid);
3071 }
3072
3073 __u64 mds_alloc_fid(struct obd_device *obd)
3074 {
3075         struct mds_obd *mds = &obd->u.mds;
3076         __u64 fid;
3077         
3078         spin_lock(&mds->mds_last_fid_lock);
3079         fid = ++mds->mds_last_fid;
3080         spin_unlock(&mds->mds_last_fid_lock);
3081
3082         return fid;
3083 }
3084
3085 /*
3086  * allocates new lustre_id on passed @inode and saves it to inode EA.
3087  */
3088 int mds_alloc_inode_sid(struct obd_device *obd, struct inode *inode,
3089                         void *handle, struct lustre_id *id)
3090 {
3091         struct mds_obd *mds = &obd->u.mds;
3092         int alloc = 0, rc = 0;
3093         ENTRY;
3094
3095         LASSERT(obd != NULL);
3096         LASSERT(inode != NULL);
3097
3098         if (id == NULL) {
3099                 OBD_ALLOC(id, sizeof(*id));
3100                 if (id == NULL)
3101                         RETURN(-ENOMEM);
3102                 alloc = 1;
3103         }
3104
3105         id_group(id) = mds->mds_num;
3106         id_fid(id) = mds_alloc_fid(obd);
3107         id_ino(id) = inode->i_ino;
3108         id_gen(id) = inode->i_generation;
3109         id_type(id) = (S_IFMT & inode->i_mode);
3110
3111         rc = mds_update_inode_sid(obd, inode, handle, id);
3112         if (rc) {
3113                 CERROR("Can't update inode FID EA, "
3114                        "rc = %d\n", rc);
3115         }
3116
3117         if (alloc)
3118                 OBD_FREE(id, sizeof(*id));
3119         RETURN(rc);
3120 }
3121
3122 /*
3123  * reads inode self id from inode EA. Probably later this should be replaced by
3124  * caching inode self id to avoid raeding it every time it is needed.
3125  */
3126 int mds_read_inode_sid(struct obd_device *obd, struct inode *inode,
3127                        struct lustre_id *id)
3128 {
3129         int rc;
3130         ENTRY;
3131
3132         LASSERT(id != NULL);
3133         LASSERT(obd != NULL);
3134         LASSERT(inode != NULL);
3135
3136         rc = fsfilt_get_md(obd, inode, &id->li_fid,
3137                            sizeof(id->li_fid), EA_SID);
3138         if (rc < 0) {
3139                 CERROR("fsfilt_get_md() failed, "
3140                        "rc = %d\n", rc);
3141                 RETURN(rc);
3142         } else if (!rc) {
3143                 rc = -ENODATA;
3144                 RETURN(rc);
3145         } else {
3146                 rc = 0;
3147         }
3148
3149         RETURN(rc);
3150 }
3151
3152 /* updates inode self id in EA. */
3153 int mds_update_inode_sid(struct obd_device *obd, struct inode *inode,
3154                          void *handle, struct lustre_id *id)
3155 {
3156         int rc = 0;
3157         ENTRY;
3158
3159         LASSERT(id != NULL);
3160         LASSERT(obd != NULL);
3161         LASSERT(inode != NULL);
3162         
3163         rc = fsfilt_set_md(obd, inode, handle, &id->li_fid,
3164                            sizeof(id->li_fid), EA_SID);
3165         if (rc) {
3166                 CERROR("fsfilt_set_md() failed, rc = %d\n", rc);
3167                 RETURN(rc);
3168         }
3169
3170         RETURN(rc);
3171 }
3172
3173 /* 
3174  * reads inode id on master MDS. This is usualy done by CMOBD to update requests
3175  * to master MDS by correct store cookie, needed to find inode on master MDS
3176  * quickly.
3177  */
3178 int mds_read_inode_mid(struct obd_device *obd, struct inode *inode,
3179                        struct lustre_id *id)
3180 {
3181         int rc;
3182         ENTRY;
3183
3184         LASSERT(id != NULL);
3185         LASSERT(obd != NULL);
3186         LASSERT(inode != NULL);
3187
3188         rc = fsfilt_get_md(obd, inode, id, sizeof(*id), EA_MID);
3189         if (rc < 0) {
3190                 CERROR("fsfilt_get_md() failed, rc = %d\n", rc);
3191                 RETURN(rc);
3192         } else if (!rc) {
3193                 rc = -ENODATA;
3194                 RETURN(rc);
3195         } else {
3196                 rc = 0;
3197         }
3198
3199         RETURN(rc);
3200 }
3201
3202 /*
3203  * updates master inode id. Usualy this is done by CMOBD after an inode is
3204  * created and relationship between cache MDS and master one should be
3205  * established.
3206  */
3207 int mds_update_inode_mid(struct obd_device *obd, struct inode *inode,
3208                          void *handle, struct lustre_id *id)
3209 {
3210         int rc = 0;
3211         ENTRY;
3212
3213         LASSERT(id != NULL);
3214         LASSERT(obd != NULL);
3215         LASSERT(inode != NULL);
3216         
3217         rc = fsfilt_set_md(obd, inode, handle, id,
3218                            sizeof(*id), EA_MID);
3219         if (rc) {
3220                 CERROR("fsfilt_set_md() failed, "
3221                        "rc = %d\n", rc);
3222                 RETURN(rc);
3223         }
3224
3225         RETURN(rc);
3226 }
3227
3228 /* mount the file system (secretly) */
3229 static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
3230 {
3231         struct lustre_cfg* lcfg = buf;
3232         struct mds_obd *mds = &obd->u.mds;
3233         struct lvfs_obd_ctxt *lvfs_ctxt = NULL;
3234         char *options = NULL;
3235         struct vfsmount *mnt;
3236         char ns_name[48];
3237         unsigned long page;
3238         int rc = 0;
3239         ENTRY;
3240
3241         if (lcfg->lcfg_bufcount < 3)
3242                 RETURN(rc = -EINVAL);
3243
3244         if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)
3245                 RETURN(rc = -EINVAL);
3246
3247         obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
3248         if (IS_ERR(obd->obd_fsops))
3249                 RETURN(rc = PTR_ERR(obd->obd_fsops));
3250
3251         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
3252
3253         page = __get_free_page(GFP_KERNEL);
3254         if (!page)
3255                 RETURN(-ENOMEM);
3256
3257         options = (char *)page;
3258         memset(options, 0, PAGE_SIZE);
3259
3260         /*
3261          * here we use "iopen_nopriv" hardcoded, because it affects MDS utility
3262          * and the rest of options are passed by mount options. Probably this
3263          * should be moved to somewhere else like startup scripts or lconf. */
3264         sprintf(options, "iopen_nopriv");
3265         
3266         if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0 && lustre_cfg_buf(lcfg, 4))
3267                 sprintf(options + strlen(options), ",%s",
3268                         lustre_cfg_string(lcfg, 4));
3269
3270         /* we have to know mdsnum before touching underlying fs -bzzz */
3271         atomic_set(&mds->mds_open_count, 0);
3272         sema_init(&mds->mds_md_sem, 1);
3273         sema_init(&mds->mds_create_sem, 1);
3274         mds->mds_md_connected = 0;
3275         mds->mds_md_name = NULL;
3276
3277         if (LUSTRE_CFG_BUFLEN(lcfg, 5) > 0 && lustre_cfg_buf(lcfg, 5) &&
3278             strncmp(lustre_cfg_string(lcfg, 5), "dumb", LUSTRE_CFG_BUFLEN(lcfg, 5))) {
3279                 class_uuid_t uuid;
3280
3281                 generate_random_uuid(uuid);
3282                 class_uuid_unparse(uuid, &mds->mds_md_uuid);
3283
3284                 OBD_ALLOC(mds->mds_md_name, LUSTRE_CFG_BUFLEN(lcfg, 5));
3285                 if (mds->mds_md_name == NULL) 
3286                         RETURN(rc = -ENOMEM);
3287
3288                 memcpy(mds->mds_md_name, lustre_cfg_buf(lcfg, 5),
3289                        LUSTRE_CFG_BUFLEN(lcfg, 5));
3290                 
3291                 CDEBUG(D_OTHER, "MDS: %s is master for %s\n",
3292                        obd->obd_name, mds->mds_md_name);
3293
3294                 rc = mds_md_connect(obd, mds->mds_md_name);
3295                 if (rc) {
3296                         OBD_FREE(mds->mds_md_name, LUSTRE_CFG_BUFLEN(lcfg, 5));
3297                         GOTO(err_ops, rc);
3298                 }
3299         }
3300
3301         mds->mds_obd_type = MDS_MASTER_OBD;
3302
3303         if (LUSTRE_CFG_BUFLEN(lcfg, 6) > 0 && lustre_cfg_buf(lcfg, 6) &&
3304             strncmp(lustre_cfg_string(lcfg, 6), "dumb", 
3305                     LUSTRE_CFG_BUFLEN(lcfg, 6))) {
3306                 if (!memcmp(lustre_cfg_string(lcfg, 6), "master", 
3307                             strlen("master"))) {
3308                         mds->mds_obd_type = MDS_MASTER_OBD;
3309                 } else if (!memcmp(lustre_cfg_string(lcfg, 6), "cache", 
3310                                    strlen("cache"))) {
3311                         mds->mds_obd_type = MDS_CACHE_OBD;
3312                 }     
3313         }
3314
3315         rc = lvfs_mount_fs(lustre_cfg_string(lcfg, 1), 
3316                            lustre_cfg_string(lcfg, 2),
3317                            options, 0, &lvfs_ctxt);
3318
3319         free_page(page);
3320
3321         if (rc || !lvfs_ctxt) {
3322                 CERROR("lvfs_mount_fs failed: rc = %d\n", rc);
3323                 GOTO(err_ops, rc);
3324         }
3325
3326         mnt = lvfs_ctxt->loc_mnt;
3327         mds->mds_lvfs_ctxt = lvfs_ctxt;
3328         ll_clear_rdonly(ll_sbdev(mnt->mnt_sb));
3329
3330         CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt);
3331
3332         sema_init(&mds->mds_epoch_sem, 1);
3333         atomic_set(&mds->mds_real_clients, 0);
3334         spin_lock_init(&mds->mds_transno_lock);
3335         spin_lock_init(&mds->mds_last_fid_lock);
3336         sema_init(&mds->mds_orphan_recovery_sem, 1);
3337         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
3338
3339         sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
3340         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
3341
3342         if (obd->obd_namespace == NULL) {
3343                 mds_cleanup(obd, 0);
3344                 GOTO(err_put, rc = -ENOMEM);
3345         }
3346         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
3347
3348         rc = mds_fs_setup(obd, mnt);
3349         if (rc) {
3350                 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
3351                        obd->obd_name, rc);
3352                 GOTO(err_ns, rc);
3353         }
3354
3355         rc = llog_start_commit_thread();
3356         if (rc < 0)
3357
3358                 GOTO(err_fs, rc);
3359
3360
3361         if (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0 && lustre_cfg_buf(lcfg, 3) &&
3362             strncmp(lustre_cfg_string(lcfg, 3), "dumb", 
3363                     LUSTRE_CFG_BUFLEN(lcfg, 3))) {
3364                 class_uuid_t uuid;
3365
3366                 generate_random_uuid(uuid);
3367                 class_uuid_unparse(uuid, &mds->mds_dt_uuid);
3368
3369                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
3370                 if (mds->mds_profile == NULL)
3371                         GOTO(err_fs, rc = -ENOMEM);
3372
3373                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
3374                         LUSTRE_CFG_BUFLEN(lcfg, 3));
3375         }
3376
3377         /* 
3378          * setup root dir and files ID dir if lmv already connected, or there is
3379          * not lmv at all.
3380          */
3381         if (mds->mds_md_exp || (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0 && 
3382                                 lustre_cfg_buf(lcfg, 3) &&
3383                                 strncmp(lustre_cfg_string(lcfg, 3), "dumb", 
3384                                         LUSTRE_CFG_BUFLEN(lcfg, 3)))) {
3385                 rc = mds_fs_setup_rootid(obd);
3386                 if (rc)
3387                         GOTO(err_fs, rc);
3388
3389                 rc = mds_fs_setup_virtid(obd);
3390                 if (rc)
3391                         GOTO(err_fs, rc);
3392         }
3393
3394         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
3395                            "mds_ldlm_client", &obd->obd_ldlm_client);
3396         obd->obd_replayable = 1;
3397
3398         rc = mds_postsetup(obd);
3399         if (rc)
3400                 GOTO(err_fs, rc);
3401
3402         RETURN(0);
3403
3404 err_fs:
3405         /* No extra cleanup needed for llog_init_commit_thread() */
3406         mds_fs_cleanup(obd, 0);
3407 err_ns:
3408         ldlm_namespace_free(obd->obd_namespace, 0);
3409         obd->obd_namespace = NULL;
3410 err_put:
3411         unlock_kernel();
3412         lvfs_umount_fs(mds->mds_lvfs_ctxt);
3413         mds->mds_sb = 0;
3414         lock_kernel();
3415 err_ops:
3416         fsfilt_put_ops(obd->obd_fsops);
3417         return rc;
3418 }
3419
3420 static int mds_postsetup(struct obd_device *obd)
3421 {
3422         struct mds_obd *mds = &obd->u.mds;
3423         int rc = 0;
3424         ENTRY;
3425
3426         rc = obd_llog_setup(obd, &obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT, 
3427                             obd, 0, NULL, &llog_lvfs_ops);
3428         if (rc)
3429                 RETURN(rc);
3430
3431         if (mds->mds_profile) {
3432                 struct llog_ctxt *lgctxt;
3433                 struct lvfs_run_ctxt saved;
3434                 struct lustre_profile *lprof;
3435                 struct config_llog_instance cfg;
3436
3437                 cfg.cfg_instance = NULL;
3438                 cfg.cfg_uuid = mds->mds_dt_uuid;
3439                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3440
3441                 lgctxt = llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT);
3442                 if (!lgctxt) {
3443                         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3444                         GOTO(err_llog, rc = -EINVAL);
3445                 }
3446                 
3447                 rc = class_config_process_llog(lgctxt, mds->mds_profile, &cfg);
3448                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3449
3450                 if (rc)
3451                         GOTO(err_llog, rc);
3452
3453                 lprof = class_get_profile(mds->mds_profile);
3454                 if (lprof == NULL) {
3455                         CERROR("No profile found: %s\n", mds->mds_profile);
3456                         GOTO(err_cleanup, rc = -ENOENT);
3457                 }
3458                 rc = mds_dt_connect(obd, lprof->lp_lov);
3459                 if (rc)
3460                         GOTO(err_cleanup, rc);
3461
3462                 rc = mds_md_postsetup(obd);
3463                 if (rc)
3464                         GOTO(err_cleanup, rc);
3465         }
3466
3467         RETURN(rc);
3468 err_cleanup:
3469         mds_dt_clean(obd);
3470 err_llog:
3471         obd_llog_cleanup(llog_get_context(&obd->obd_llogs,
3472                                           LLOG_CONFIG_ORIG_CTXT));
3473         return rc;
3474 }
3475
3476 int mds_postrecov_common(struct obd_device *obd)
3477 {
3478         struct mds_obd *mds = &obd->u.mds;
3479         struct llog_ctxt *ctxt;
3480         int rc, item = 0, valsize;
3481          __u32 group;
3482         ENTRY;
3483
3484         LASSERT(!obd->obd_recovering);
3485         ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
3486         LASSERT(ctxt != NULL);
3487
3488         /* clean PENDING dir */
3489         rc = mds_cleanup_orphans(obd);
3490         if (rc < 0)
3491                 GOTO(out, rc);
3492         item = rc;
3493
3494         group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
3495         valsize = sizeof(group);
3496         rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"),
3497                           "mds_conn", valsize, &group);
3498         if (rc)
3499                 GOTO(out, rc);
3500
3501         rc = llog_connect(ctxt, obd->u.mds.mds_dt_desc.ld_tgt_count,
3502                           NULL, NULL, NULL);
3503         if (rc) {
3504                 CERROR("%s: failed at llog_origin_connect: %d\n", 
3505                        obd->obd_name, rc);
3506                 GOTO(out, rc);
3507         }
3508
3509         /* remove the orphaned precreated objects */
3510         rc = mds_dt_clear_orphans(mds, NULL /* all OSTs */);
3511         if (rc)
3512                 GOTO(err_llog, rc);
3513
3514 out:
3515         RETURN(rc < 0 ? rc : item);
3516
3517 err_llog:
3518         /* cleanup all llogging subsystems */
3519         rc = obd_llog_finish(obd, &obd->obd_llogs,
3520                              mds->mds_dt_desc.ld_tgt_count);
3521         if (rc)
3522                 CERROR("%s: failed to cleanup llogging subsystems\n",
3523                         obd->obd_name);
3524         goto out;
3525 }
3526
3527 int mds_postrecov(struct obd_device *obd)
3528 {
3529         int rc;
3530         ENTRY;
3531         rc = mds_postrecov_common(obd);
3532         if (rc == 0)
3533                 rc = mds_md_reconnect(obd);
3534         RETURN(rc);
3535 }
3536
3537 int mds_dt_clean(struct obd_device *obd)
3538 {
3539         struct mds_obd *mds = &obd->u.mds;
3540         ENTRY;
3541
3542         if (mds->mds_profile) {
3543                 char * cln_prof;
3544                 struct llog_ctxt *llctx;
3545                 struct lvfs_run_ctxt saved;
3546                 struct config_llog_instance cfg;
3547                 int len = strlen(mds->mds_profile) + sizeof("-clean") + 1;
3548
3549                 OBD_ALLOC(cln_prof, len);
3550                 sprintf(cln_prof, "%s-clean", mds->mds_profile);
3551
3552                 cfg.cfg_instance = NULL;
3553                 cfg.cfg_uuid = mds->mds_dt_uuid;
3554
3555                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3556                 llctx = llog_get_context(&obd->obd_llogs,
3557                                          LLOG_CONFIG_ORIG_CTXT);
3558                 class_config_process_llog(llctx, cln_prof, &cfg);
3559                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
3560
3561                 OBD_FREE(cln_prof, len);
3562                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
3563                 mds->mds_profile = NULL;
3564         }
3565         RETURN(0);
3566 }
3567
3568 int mds_md_clean(struct obd_device *obd)
3569 {
3570         struct mds_obd *mds = &obd->u.mds;
3571         ENTRY;
3572
3573         if (mds->mds_md_name) {
3574                 OBD_FREE(mds->mds_md_name, strlen(mds->mds_md_name) + 1);
3575                 mds->mds_md_name = NULL;
3576         }
3577         RETURN(0);
3578 }
3579
3580 static int mds_precleanup(struct obd_device *obd, int flags)
3581 {
3582         int rc = 0;
3583         ENTRY;
3584
3585         mds_md_clean(obd);
3586         mds_dt_disconnect(obd, flags);
3587         mds_dt_clean(obd);
3588         obd_llog_cleanup(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT));
3589         RETURN(rc);
3590 }
3591
3592 extern void lgss_svc_cache_purge_all(void);
3593 static int mds_cleanup(struct obd_device *obd, int flags)
3594 {
3595         struct mds_obd *mds = &obd->u.mds;
3596         ENTRY;
3597
3598         if (mds->mds_sb == NULL)
3599                 RETURN(0);
3600
3601         mds_update_server_data(obd, 1);
3602         mds_update_last_fid(obd, NULL, 1);
3603         
3604         if (mds->mds_dt_objids != NULL) {
3605                 int size = mds->mds_dt_desc.ld_tgt_count *
3606                         sizeof(obd_id);
3607                 OBD_FREE(mds->mds_dt_objids, size);
3608         }
3609         mds_fs_cleanup(obd, flags);
3610
3611         unlock_kernel();
3612
3613         /* 2 seems normal on mds, (may_umount() also expects 2
3614           fwiw), but we only see 1 at this point in obdfilter. */
3615         lvfs_umount_fs(mds->mds_lvfs_ctxt);
3616
3617         mds->mds_sb = 0;
3618
3619         ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
3620
3621         spin_lock_bh(&obd->obd_processing_task_lock);
3622         if (obd->obd_recovering) {
3623                 target_cancel_recovery_timer(obd);
3624                 obd->obd_recovering = 0;
3625         }
3626         spin_unlock_bh(&obd->obd_processing_task_lock);
3627
3628         lock_kernel();
3629         fsfilt_put_ops(obd->obd_fsops);
3630
3631 #ifdef ENABLE_GSS
3632         /* XXX */
3633         lgss_svc_cache_purge_all();
3634 #endif
3635
3636         spin_lock(&mds->mds_denylist_lock);
3637         while (!list_empty( &mds->mds_denylist ) ) {
3638                 deny_sec_t *p_deny_sec = list_entry(mds->mds_denylist.next,
3639                                                     deny_sec_t, list);
3640                 list_del(&p_deny_sec->list);
3641                 OBD_FREE(p_deny_sec, sizeof(*p_deny_sec));
3642         }
3643         spin_unlock(&mds->mds_denylist_lock);
3644         if(mds->mds_mds_sec)
3645                 OBD_FREE(mds->mds_mds_sec, strlen(mds->mds_mds_sec) + 1);
3646         if(mds->mds_ost_sec)
3647                 OBD_FREE(mds->mds_ost_sec, strlen(mds->mds_ost_sec) + 1);
3648
3649         RETURN(0);
3650 }
3651
3652 static int set_security(const char *value, char **sec)
3653 {
3654         int rc = 0;
3655
3656         if (!strcmp(value, "null") ||
3657             !strcmp(value, "krb5i") ||
3658             !strcmp(value, "krb5p")) {
3659                 OBD_ALLOC(*sec, strlen(value) + 1);
3660                 if(!*sec)
3661                         RETURN(-ENOMEM);
3662                 memcpy(*sec, value, strlen(value) + 1);
3663         } else {
3664                 CERROR("Unrecognized value, force use NULL\n");
3665                 rc = -EINVAL;
3666         }
3667
3668         return rc;
3669 }
3670
3671 static int mds_process_config(struct obd_device *obd, obd_count len, void *buf)
3672 {
3673         struct lustre_cfg *lcfg = buf;
3674         struct mds_obd *mds = &obd->u.mds;
3675         int rc = 0;
3676         ENTRY;
3677
3678         switch(lcfg->lcfg_command) {
3679         case LCFG_SET_SECURITY: {
3680                 if ((LUSTRE_CFG_BUFLEN(lcfg, 1) == 0) ||
3681                     (LUSTRE_CFG_BUFLEN(lcfg, 2) == 0))
3682                         GOTO(out, rc = -EINVAL);
3683
3684                 if (!strcmp(lustre_cfg_string(lcfg, 1), "mds_sec"))
3685                         rc = set_security(lustre_cfg_string(lcfg, 2),
3686                                           &mds->mds_mds_sec);
3687                 else if (!strcmp(lustre_cfg_string(lcfg, 1), "oss_sec"))
3688                         rc = set_security(lustre_cfg_string(lcfg, 2),
3689                                           &mds->mds_ost_sec);
3690                 else if (!strcmp(lustre_cfg_string(lcfg, 1), "deny_sec")){
3691                         spin_lock(&mds->mds_denylist_lock);
3692                         rc = add_deny_security(lustre_cfg_string(lcfg, 2),
3693                                                &mds->mds_denylist);
3694                         spin_unlock(&mds->mds_denylist_lock);
3695                 } else {
3696                         CERROR("Unrecognized key\n");
3697                         rc = -EINVAL;
3698                 }
3699                 break;
3700         }
3701         default: {
3702                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
3703                 GOTO(out, rc = -EINVAL);
3704
3705         }
3706         }
3707 out:
3708         RETURN(rc);
3709 }
3710
3711 static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
3712                                         int offset,
3713                                         struct ldlm_lock *new_lock,
3714                                         struct ldlm_lock **old_lock,
3715                                         struct lustre_handle *lockh)
3716 {
3717         struct obd_export *exp = req->rq_export;
3718         struct obd_device *obd = exp->exp_obd;
3719         struct ldlm_request *dlmreq =
3720                 lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
3721         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
3722         struct list_head *iter;
3723
3724         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
3725                 return;
3726
3727         l_lock(&obd->obd_namespace->ns_lock);
3728         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
3729                 struct ldlm_lock *lock;
3730                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
3731                 if (lock == new_lock)
3732                         continue;
3733                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
3734                         lockh->cookie = lock->l_handle.h_cookie;
3735                         LDLM_DEBUG(lock, "restoring lock cookie");
3736                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
3737                                   lockh->cookie);
3738                         if (old_lock)
3739                                 *old_lock = LDLM_LOCK_GET(lock);
3740                         l_unlock(&obd->obd_namespace->ns_lock);
3741                         return;
3742                 }
3743         }
3744         l_unlock(&obd->obd_namespace->ns_lock);
3745
3746         /* If the xid matches, then we know this is a resent request,
3747          * and allow it. (It's probably an OPEN, for which we don't
3748          * send a lock */
3749         if (req->rq_xid == 
3750             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
3751                 return;
3752
3753         if (req->rq_xid == 
3754             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_close_xid))
3755                 return;
3756
3757         /* This remote handle isn't enqueued, so we never received or
3758          * processed this request.  Clear MSG_RESENT, because it can
3759          * be handled like any normal request now. */
3760
3761         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
3762
3763         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
3764                   remote_hdl.cookie);
3765 }
3766
3767 int intent_disposition(struct ldlm_reply *rep, int flag)
3768 {
3769         if (!rep)
3770                 return 0;
3771         return (rep->lock_policy_res1 & flag);
3772 }
3773
3774 void intent_set_disposition(struct ldlm_reply *rep, int flag)
3775 {
3776         if (!rep)
3777                 return;
3778         rep->lock_policy_res1 |= flag;
3779 }
3780
3781 static int mds_intent_policy(struct ldlm_namespace *ns,
3782                              struct ldlm_lock **lockp, void *req_cookie,
3783                              ldlm_mode_t mode, int flags, void *data)
3784 {
3785         struct ptlrpc_request *req = req_cookie;
3786         struct ldlm_lock *lock = *lockp;
3787         struct ldlm_intent *it;
3788         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
3789         struct ldlm_reply *rep;
3790         struct lustre_handle lockh[2] = {{0}, {0}};
3791         struct ldlm_lock *new_lock = NULL;
3792         int getattr_part = MDS_INODELOCK_UPDATE;
3793         int rc, reply_buffers;
3794         int repsize[5] = {sizeof(struct ldlm_reply),
3795                           sizeof(struct mds_body),
3796                           mds->mds_max_mdsize};
3797
3798         int offset = MDS_REQ_INTENT_REC_OFF; 
3799         ENTRY;
3800
3801         LASSERT(req != NULL);
3802         MD_COUNTER_INCREMENT(req->rq_export->exp_obd, intent_lock);
3803
3804         if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
3805                 /* No intent was provided */
3806                 int size = sizeof(struct ldlm_reply);
3807                 rc = lustre_pack_reply(req, 1, &size, NULL);
3808                 LASSERT(rc == 0);
3809                 RETURN(0);
3810         }
3811
3812         it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
3813                                 lustre_swab_ldlm_intent);
3814         if (it == NULL) {
3815                 CERROR("Intent missing\n");
3816                 RETURN(req->rq_status = -EFAULT);
3817         }
3818
3819         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
3820
3821         reply_buffers = 3;
3822         if (it->opc & ( IT_OPEN | IT_GETATTR | IT_LOOKUP | IT_CHDIR )) {
3823                 reply_buffers = 5;
3824                 repsize[3] = 4;
3825                 repsize[4] = xattr_acl_size(LL_ACL_MAX_ENTRIES);
3826         }
3827
3828         rc = lustre_pack_reply(req, reply_buffers, repsize, NULL);
3829         if (rc)
3830                 RETURN(req->rq_status = rc);
3831
3832         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
3833         LASSERT(rep != NULL);
3834
3835         intent_set_disposition(rep, DISP_IT_EXECD);
3836
3837         /* execute policy */
3838         switch ((long)it->opc) {
3839         case IT_OPEN:
3840         case IT_CREAT|IT_OPEN:
3841                 fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
3842                                             lock, NULL, lockh);
3843                 /* XXX swab here to assert that an mds_open reint
3844                  * packet is following */
3845                 fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF, 
3846                                             lock, NULL, lockh);
3847                 rep->lock_policy_res2 = mds_reint(req, offset, lockh);
3848
3849                 if (rep->lock_policy_res2) {
3850                         /* 
3851                          * mds_open() returns ENOLCK where it should return
3852                          * zero, but it has no lock to return.
3853                          */
3854                         if (rep->lock_policy_res2 == ENOLCK)
3855                                 rep->lock_policy_res2 = 0;
3856
3857                         RETURN(ELDLM_LOCK_ABORTED);
3858                 }
3859                 
3860                 /*
3861                  * IT_OPEN may return lock on cross-node dentry that we want to
3862                  * hold during attr retrival -bzzz
3863                  */
3864                 if (lockh[0].cookie == 0)
3865                         RETURN(ELDLM_LOCK_ABORTED);
3866                 
3867                 break;
3868         case IT_LOOKUP:
3869                 getattr_part = MDS_INODELOCK_LOOKUP;
3870         case IT_CHDIR:
3871         case IT_GETATTR:
3872                 getattr_part |= MDS_INODELOCK_LOOKUP;
3873         case IT_READDIR:
3874                 fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF, 
3875                                             lock, &new_lock, lockh);
3876                 rep->lock_policy_res2 = mds_getattr_lock(req, offset, lockh,
3877                                                          getattr_part);
3878                 /* FIXME: LDLM can set req->rq_status. MDS sets
3879                    policy_res{1,2} with disposition and status.
3880                    - replay: returns 0 & req->status is old status
3881                    - otherwise: returns req->status */
3882                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
3883                         rep->lock_policy_res2 = 0;
3884                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
3885                     rep->lock_policy_res2)
3886                         RETURN(ELDLM_LOCK_ABORTED);
3887                 if (req->rq_status != 0) {
3888                         LBUG();
3889                         rep->lock_policy_res2 = req->rq_status;
3890                         RETURN(ELDLM_LOCK_ABORTED);
3891                 }
3892                 break;
3893         case IT_UNLINK:
3894                 rc = mds_lock_and_check_slave(offset, req, lockh);
3895                 if ((rep->lock_policy_res2 = rc)) {
3896                         if (rc == ENOLCK)
3897                                 rep->lock_policy_res2 = 0;
3898                         RETURN(ELDLM_LOCK_ABORTED);
3899                 }
3900                 break;
3901         default:
3902                 CERROR("Unhandled intent "LPD64"\n", it->opc);
3903                 LBUG();
3904         }
3905
3906         /* By this point, whatever function we called above must have either
3907          * filled in 'lockh', been an intent replay, or returned an error.  We
3908          * want to allow replayed RPCs to not get a lock, since we would just
3909          * drop it below anyways because lock replay is done separately by the
3910          * client afterwards.  For regular RPCs we want to give the new lock to
3911          * the client instead of whatever lock it was about to get. */
3912         if (new_lock == NULL)
3913                 new_lock = ldlm_handle2lock(&lockh[0]);
3914         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
3915                 RETURN(0);
3916
3917         LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
3918                  it->opc, lockh[0].cookie);
3919
3920         /* If we've already given this lock to a client once, then we should
3921          * have no readers or writers.  Otherwise, we should have one reader
3922          * _or_ writer ref (which will be zeroed below) before returning the
3923          * lock to a client. */
3924         if (new_lock->l_export == req->rq_export) {
3925                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
3926         } else {
3927                 LASSERT(new_lock->l_export == NULL);
3928                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
3929         }
3930
3931         *lockp = new_lock;
3932
3933         if (new_lock->l_export == req->rq_export) {
3934                 /* Already gave this to the client, which means that we
3935                  * reconstructed a reply. */
3936                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
3937                         MSG_RESENT);
3938                 RETURN(ELDLM_LOCK_REPLACED);
3939         }
3940
3941         /* Fixup the lock to be given to the client */
3942         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
3943         new_lock->l_readers = 0;
3944         new_lock->l_writers = 0;
3945
3946         new_lock->l_export = class_export_get(req->rq_export);
3947         list_add(&new_lock->l_export_chain,
3948                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
3949
3950         new_lock->l_blocking_ast = lock->l_blocking_ast;
3951         new_lock->l_completion_ast = lock->l_completion_ast;
3952
3953         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
3954                sizeof(lock->l_remote_handle));
3955
3956         new_lock->l_flags &= ~LDLM_FL_LOCAL;
3957
3958         LDLM_LOCK_PUT(new_lock);
3959         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
3960
3961         RETURN(ELDLM_LOCK_REPLACED);
3962 }
3963
3964 int mds_attach(struct obd_device *dev, obd_count len, void *data)
3965 {
3966         struct lprocfs_static_vars lvars;
3967         int rc = 0;
3968         struct mds_obd *mds = &dev->u.mds;
3969
3970         spin_lock_init(&mds->mds_denylist_lock);
3971         INIT_LIST_HEAD(&mds->mds_denylist);
3972
3973         lprocfs_init_multi_vars(0, &lvars);
3974
3975         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
3976         if (rc)
3977                 return rc;
3978
3979         return lprocfs_alloc_md_stats(dev, 0);
3980 }
3981
3982 int mds_detach(struct obd_device *dev)
3983 {
3984         lprocfs_free_md_stats(dev);
3985         return lprocfs_obd_detach(dev);
3986 }
3987
3988 int mdt_attach(struct obd_device *dev, obd_count len, void *data)
3989 {
3990         struct lprocfs_static_vars lvars;
3991
3992         lprocfs_init_multi_vars(1, &lvars);
3993         return lprocfs_obd_attach(dev, lvars.obd_vars);
3994 }
3995
3996 int mdt_detach(struct obd_device *dev)
3997 {
3998         return lprocfs_obd_detach(dev);
3999 }
4000
4001 static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
4002 {
4003         struct mds_obd *mds = &obd->u.mds;
4004         int rc = 0;
4005         ENTRY;
4006
4007         mds->mds_service =
4008                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
4009                                 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
4010                                 MDS_SERVICE_WATCHDOG_TIMEOUT,
4011                                 mds_handle, "mds", obd->obd_proc_entry);
4012
4013         if (!mds->mds_service) {
4014                 CERROR("failed to start service\n");
4015                 RETURN(-ENOMEM);
4016         }
4017
4018         rc = ptlrpc_start_n_threads(obd, mds->mds_service, MDT_NUM_THREADS,
4019                                     "ll_mdt");
4020         if (rc)
4021                 GOTO(err_thread, rc);
4022
4023         mds->mds_setattr_service =
4024                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
4025                                 MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
4026                                 MDS_SERVICE_WATCHDOG_TIMEOUT,
4027                                 mds_handle, "mds_setattr",
4028                                 obd->obd_proc_entry);
4029         if (!mds->mds_setattr_service) {
4030                 CERROR("failed to start getattr service\n");
4031                 GOTO(err_thread, rc = -ENOMEM);
4032         }
4033
4034         rc = ptlrpc_start_n_threads(obd, mds->mds_setattr_service,
4035                                     MDT_NUM_THREADS, "ll_mdt_attr");
4036         if (rc)
4037                 GOTO(err_thread2, rc);
4038
4039         mds->mds_readpage_service =
4040                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
4041                                 MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
4042                                 MDS_SERVICE_WATCHDOG_TIMEOUT,
4043                                 mds_handle, "mds_readpage",
4044                                 obd->obd_proc_entry);
4045         if (!mds->mds_readpage_service) {
4046                 CERROR("failed to start readpage service\n");
4047                 GOTO(err_thread2, rc = -ENOMEM);
4048         }
4049
4050         rc = ptlrpc_start_n_threads(obd, mds->mds_readpage_service,
4051                                     MDT_NUM_THREADS, "ll_mdt_rdpg");
4052
4053         if (rc)
4054                 GOTO(err_thread3, rc);
4055
4056         RETURN(0);
4057
4058 err_thread3:
4059         ptlrpc_unregister_service(mds->mds_readpage_service);
4060 err_thread2:
4061         ptlrpc_unregister_service(mds->mds_setattr_service);
4062 err_thread:
4063         ptlrpc_unregister_service(mds->mds_service);
4064         return rc;
4065 }
4066
4067 static int mdt_cleanup(struct obd_device *obd, int flags)
4068 {
4069         struct mds_obd *mds = &obd->u.mds;
4070         ENTRY;
4071
4072         ptlrpc_stop_all_threads(mds->mds_readpage_service);
4073         ptlrpc_unregister_service(mds->mds_readpage_service);
4074
4075         ptlrpc_stop_all_threads(mds->mds_setattr_service);
4076         ptlrpc_unregister_service(mds->mds_setattr_service);
4077
4078         ptlrpc_stop_all_threads(mds->mds_service);
4079         ptlrpc_unregister_service(mds->mds_service);
4080
4081         RETURN(0);
4082 }
4083
4084 static struct dentry *mds_lvfs_id2dentry(__u64 ino, __u32 gen,
4085                                          __u64 gr, void *data)
4086 {
4087         struct lustre_id id;
4088         struct obd_device *obd = data;
4089         
4090         id_ino(&id) = ino;
4091         id_gen(&id) = gen;
4092         return mds_id2dentry(obd, &id, NULL);
4093 }
4094
4095 static int mds_get_info(struct obd_export *exp, __u32 keylen,
4096                         void *key, __u32 *valsize, void *val)
4097 {
4098         struct obd_device *obd;
4099         struct mds_obd *mds;
4100         ENTRY;
4101
4102         obd = class_exp2obd(exp);
4103         mds = &obd->u.mds;
4104         
4105         if (obd == NULL) {
4106                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
4107                        exp->exp_handle.h_cookie);
4108                 RETURN(-EINVAL);
4109         }
4110
4111         if (keylen >= strlen("reint_log") && memcmp(key, "reint_log", 9) == 0) {
4112                 /* get log_context handle. */
4113                 unsigned long *llh_handle = val;
4114                 *valsize = sizeof(unsigned long);
4115                 *llh_handle = (unsigned long)obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT];
4116                 RETURN(0);
4117         }
4118         if (keylen >= strlen("cache_sb") && memcmp(key, "cache_sb", 8) == 0) {
4119                 /* get log_context handle. */
4120                 unsigned long *sb = val;
4121                 *valsize = sizeof(unsigned long);
4122                 *sb = (unsigned long)obd->u.mds.mds_sb;
4123                 RETURN(0);
4124         }
4125
4126         if (keylen >= strlen("mdsize") && memcmp(key, "mdsize", keylen) == 0) {
4127                 __u32 *mdsize = val;
4128                 *valsize = sizeof(*mdsize);
4129                 *mdsize = mds->mds_max_mdsize;
4130                 RETURN(0);
4131         }
4132
4133         if (keylen >= strlen("mdsnum") && strcmp(key, "mdsnum") == 0) {
4134                 __u32 *mdsnum = val;
4135                 *valsize = sizeof(*mdsnum);
4136                 *mdsnum = mds->mds_num;
4137                 RETURN(0);
4138         }
4139
4140         if (keylen >= strlen("rootid") && strcmp(key, "rootid") == 0) {
4141                 struct lustre_id *rootid = val;
4142                 *valsize = sizeof(struct lustre_id);
4143                 *rootid = mds->mds_rootid;
4144                 RETURN(0);
4145         }
4146
4147         CDEBUG(D_IOCTL, "invalid key\n");
4148         RETURN(-EINVAL);
4149
4150 }
4151 struct lvfs_callback_ops mds_lvfs_ops = {
4152         l_id2dentry:     mds_lvfs_id2dentry,
4153 };
4154
4155 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
4156                 int objcount, struct obd_ioobj *obj,
4157                 int niocount, struct niobuf_remote *nb,
4158                 struct niobuf_local *res,
4159                 struct obd_trans_info *oti);
4160
4161 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
4162                  int objcount, struct obd_ioobj *obj, int niocount,
4163                  struct niobuf_local *res, struct obd_trans_info *oti,
4164                  int rc);
4165
4166 /* use obd ops to offer management infrastructure */
4167 static struct obd_ops mds_obd_ops = {
4168         .o_owner           = THIS_MODULE,
4169         .o_attach          = mds_attach,
4170         .o_detach          = mds_detach,
4171         .o_connect         = mds_connect,
4172         .o_connect_post    = mds_connect_post,
4173         .o_init_export     = mds_init_export,
4174         .o_destroy_export  = mds_destroy_export,
4175         .o_disconnect      = mds_disconnect,
4176         .o_setup           = mds_setup,
4177         .o_precleanup      = mds_precleanup,
4178         .o_cleanup         = mds_cleanup,
4179         .o_process_config  = mds_process_config,
4180         .o_postrecov       = mds_postrecov,
4181         .o_statfs          = mds_obd_statfs,
4182         .o_iocontrol       = mds_iocontrol,
4183         .o_create          = mds_obd_create,
4184         .o_destroy         = mds_obd_destroy,
4185         .o_llog_init       = mds_llog_init,
4186         .o_llog_finish     = mds_llog_finish,
4187         .o_notify          = mds_notify,
4188         .o_get_info        = mds_get_info,
4189         .o_set_info        = mds_set_info,
4190         .o_preprw          = mds_preprw, 
4191         .o_commitrw        = mds_commitrw,
4192 };
4193
4194 static struct obd_ops mdt_obd_ops = {
4195         .o_owner           = THIS_MODULE,
4196         .o_attach          = mdt_attach,
4197         .o_detach          = mdt_detach,
4198         .o_setup           = mdt_setup,
4199         .o_cleanup         = mdt_cleanup,
4200 };
4201
4202 static int __init mds_init(void)
4203 {
4204         struct lprocfs_static_vars lvars;
4205
4206         mds_init_lsd_cache();
4207
4208         lprocfs_init_multi_vars(0, &lvars);
4209         class_register_type(&mds_obd_ops, NULL, lvars.module_vars,
4210                             LUSTRE_MDS_NAME);
4211         lprocfs_init_multi_vars(1, &lvars);
4212         class_register_type(&mdt_obd_ops, NULL, lvars.module_vars,
4213                             LUSTRE_MDT_NAME);
4214
4215         return 0;
4216 }
4217
4218 static void /*__exit*/ mds_exit(void)
4219 {
4220         mds_cleanup_lsd_cache();
4221
4222         class_unregister_type(LUSTRE_MDS_NAME);
4223         class_unregister_type(LUSTRE_MDT_NAME);
4224 }
4225
4226 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4227 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
4228 MODULE_LICENSE("GPL");
4229
4230 module_init(mds_init);
4231 module_exit(mds_exit);