Whamcloud - gitweb
mdt prototype changes
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *
14  *   This file is part of the Lustre file system, http://www.lustre.org
15  *   Lustre is a trademark of Cluster File Systems, Inc.
16  *
17  *   You may have signed or agreed to another license before downloading
18  *   this software.  If so, you are bound by the terms and conditions
19  *   of that agreement, and the following does not apply to you.  See the
20  *   LICENSE file included with this distribution for more information.
21  *
22  *   If you did not agree to a different license, then this copy of Lustre
23  *   is open source software; you can redistribute it and/or modify it
24  *   under the terms of version 2 of the GNU General Public License as
25  *   published by the Free Software Foundation.
26  *
27  *   In either case, Lustre is distributed in the hope that it will be
28  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30  *   license text for more details.
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 /*
41  * LUSTRE_VERSION_CODE
42  */
43 #include <linux/lustre_ver.h>
44 /*
45  * struct OBD_{ALLOC,FREE}*()
46  * OBD_FAIL_CHECK
47  */
48 #include <linux/obd_support.h>
49
50 #include <linux/lu_object.h>
51
52 #include "mdt.h"
53
54 /*
55  * Initialized in mdt_mod_init().
56  */
57 unsigned long mdt_num_threads;
58
59 static int mdt_getstatus(struct mdt_thread_info *info,
60                          struct ptlrpc_request *req, int offset)
61 {
62         struct md_device *mdd  = info->mti_mdt->mdt_child;
63         struct mds_body  *body;
64         int               size = sizeof *body;
65         int               result;
66
67         ENTRY;
68
69         result = lustre_pack_reply(req, 1, &size, NULL);
70         if (result)
71                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
72                        size);
73         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
74                 result = -ENOMEM;
75         else {
76                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
77                 result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
78         }
79
80         /* the last_committed and last_xid fields are filled in for all
81          * replies already - no need to do so here also.
82          */
83         RETURN(result);
84 }
85
86 /*
87  * struct obd_device
88  */
89 #include <linux/obd.h>
90 /*
91  * struct class_connect()
92  */
93 #include <linux/obd_class.h>
94 /*
95  * struct obd_export
96  */
97 #include <linux/lustre_export.h>
98 /*
99  * struct mds_client_data
100  */
101 #include <../mds/mds_internal.h>
102 #include <linux/lustre_mds.h>
103 #include <linux/lustre_fsfilt.h>
104 #include <linux/lprocfs_status.h>
105 #include <linux/lustre_commit_confd.h>
106 #include <linux/lustre_quota.h>
107 #include <linux/lustre_disk.h>
108 #include <linux/lustre_ver.h>
109
110 static int mds_intent_policy(struct ldlm_namespace *ns,
111                              struct ldlm_lock **lockp, void *req_cookie,
112                              ldlm_mode_t mode, int flags, void *data);
113 static int mds_postsetup(struct obd_device *obd);
114 static int mds_cleanup(struct obd_device *obd);
115
116 /* Assumes caller has already pushed into the kernel filesystem context */
117 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
118                         loff_t offset, int count)
119 {
120         struct ptlrpc_bulk_desc *desc;
121         struct l_wait_info lwi;
122         struct page **pages;
123         int rc = 0, npages, i, tmpcount, tmpsize = 0;
124         ENTRY;
125
126         LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */
127
128         npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT;
129         OBD_ALLOC(pages, sizeof(*pages) * npages);
130         if (!pages)
131                 GOTO(out, rc = -ENOMEM);
132
133         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
134                                     MDS_BULK_PORTAL);
135         if (desc == NULL)
136                 GOTO(out_free, rc = -ENOMEM);
137
138         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
139                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
140
141                 pages[i] = alloc_pages(GFP_KERNEL, 0);
142                 if (pages[i] == NULL)
143                         GOTO(cleanup_buf, rc = -ENOMEM);
144
145                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
146         }
147
148         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
149                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
150                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
151                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
152                        file->f_dentry->d_inode->i_size);
153
154                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
155                                      kmap(pages[i]), tmpsize, &offset);
156                 kunmap(pages[i]);
157
158                 if (rc != tmpsize)
159                         GOTO(cleanup_buf, rc = -EIO);
160         }
161
162         LASSERT(desc->bd_nob == count);
163
164         rc = ptlrpc_start_bulk_transfer(desc);
165         if (rc)
166                 GOTO(cleanup_buf, rc);
167
168         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
169                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
170                        OBD_FAIL_MDS_SENDPAGE, rc);
171                 GOTO(abort_bulk, rc);
172         }
173
174         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
175         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
176         LASSERT (rc == 0 || rc == -ETIMEDOUT);
177
178         if (rc == 0) {
179                 if (desc->bd_success &&
180                     desc->bd_nob_transferred == count)
181                         GOTO(cleanup_buf, rc);
182
183                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
184         }
185
186         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
187                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
188                   desc->bd_nob_transferred, count,
189                   req->rq_export->exp_client_uuid.uuid,
190                   req->rq_export->exp_connection->c_remote_uuid.uuid);
191
192         class_fail_export(req->rq_export);
193
194         EXIT;
195  abort_bulk:
196         ptlrpc_abort_bulk (desc);
197  cleanup_buf:
198         for (i = 0; i < npages; i++)
199                 if (pages[i])
200                         __free_pages(pages[i], 0);
201
202         ptlrpc_free_bulk(desc);
203  out_free:
204         OBD_FREE(pages, sizeof(*pages) * npages);
205  out:
206         return rc;
207 }
208
209 /* only valid locked dentries or errors should be returned */
210 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
211                                      struct vfsmount **mnt, int lock_mode,
212                                      struct lustre_handle *lockh,
213                                      char *name, int namelen, __u64 lockpart)
214 {
215         struct mds_obd *mds = &obd->u.mds;
216         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
217         struct ldlm_res_id res_id = { .name = {0} };
218         int flags = 0, rc;
219         ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
220         ENTRY;
221
222         if (IS_ERR(de))
223                 RETURN(de);
224
225         res_id.name[0] = de->d_inode->i_ino;
226         res_id.name[1] = de->d_inode->i_generation;
227         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
228                               LDLM_IBITS, &policy, lock_mode, &flags,
229                               ldlm_blocking_ast, ldlm_completion_ast,
230                               NULL, NULL, NULL, 0, NULL, lockh);
231         if (rc != ELDLM_OK) {
232                 l_dput(de);
233                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
234         }
235
236         RETURN(retval);
237 }
238
239 /* Look up an entry by inode number. */
240 /* this function ONLY returns valid dget'd dentries with an initialized inode
241    or errors */
242 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
243                               struct vfsmount **mnt)
244 {
245         char fid_name[32];
246         unsigned long ino = fid->id;
247         __u32 generation = fid->generation;
248         struct inode *inode;
249         struct dentry *result;
250
251         if (ino == 0)
252                 RETURN(ERR_PTR(-ESTALE));
253
254         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
255
256         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
257                ino, generation, mds->mds_obt.obt_sb);
258
259         /* under ext3 this is neither supposed to return bad inodes
260            nor NULL inodes. */
261         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
262         if (IS_ERR(result))
263                 RETURN(result);
264
265         inode = result->d_inode;
266         if (!inode)
267                 RETURN(ERR_PTR(-ENOENT));
268
269         if (inode->i_generation == 0 || inode->i_nlink == 0) {
270                 LCONSOLE_WARN("Found inode with zero generation or link -- this"
271                               " may indicate disk corruption (inode: %lu, link:"
272                               " %lu, count: %d)\n", inode->i_ino,
273                               (unsigned long)inode->i_nlink,
274                               atomic_read(&inode->i_count));
275                 dput(result);
276                 RETURN(ERR_PTR(-ENOENT));
277         }
278
279         if (generation && inode->i_generation != generation) {
280                 /* we didn't find the right inode.. */
281                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
282                        "count: %d, generation %u/%u\n", inode->i_ino,
283                        (unsigned long)inode->i_nlink,
284                        atomic_read(&inode->i_count), inode->i_generation,
285                        generation);
286                 dput(result);
287                 RETURN(ERR_PTR(-ENOENT));
288         }
289
290         if (mnt) {
291                 *mnt = mds->mds_vfsmnt;
292                 mntget(*mnt);
293         }
294
295         RETURN(result);
296 }
297
298 static int mds_connect_internal(struct obd_export *exp,
299                                 struct obd_connect_data *data)
300 {
301         struct obd_device *obd = exp->exp_obd;
302         if (data != NULL) {
303                 data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
304                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
305
306                 /* If no known bits (which should not happen, probably,
307                    as everybody should support LOOKUP and UPDATE bits at least)
308                    revert to compat mode with plain locks. */
309                 if (!data->ocd_ibits_known &&
310                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
311                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
312
313                 if (!obd->u.mds.mds_fl_acl)
314                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
315
316                 if (!obd->u.mds.mds_fl_user_xattr)
317                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
318
319                 exp->exp_connect_flags = data->ocd_connect_flags;
320                 data->ocd_version = LUSTRE_VERSION_CODE;
321                 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
322         }
323
324         if (obd->u.mds.mds_fl_acl &&
325             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
326                 CWARN("%s: MDS requires ACL support but client does not\n",
327                       obd->obd_name);
328                 return -EBADE;
329         }
330         return 0;
331 }
332
333 static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
334                          struct obd_uuid *cluuid,
335                          struct obd_connect_data *data)
336 {
337         int rc;
338         ENTRY;
339
340         if (exp == NULL || obd == NULL || cluuid == NULL)
341                 RETURN(-EINVAL);
342
343         rc = mds_connect_internal(exp, data);
344
345         RETURN(rc);
346 }
347
348 /* Establish a connection to the MDS.
349  *
350  * This will set up an export structure for the client to hold state data
351  * about that client, like open files, the last operation number it did
352  * on the server, etc.
353  */
354 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
355                        struct obd_uuid *cluuid, struct obd_connect_data *data)
356 {
357         struct obd_export *exp;
358         struct mds_export_data *med;
359         struct mds_client_data *mcd = NULL;
360         int rc, abort_recovery;
361         ENTRY;
362
363         if (!conn || !obd || !cluuid)
364                 RETURN(-EINVAL);
365
366         /* Check for aborted recovery. */
367         spin_lock_bh(&obd->obd_processing_task_lock);
368         abort_recovery = obd->obd_abort_recovery;
369         spin_unlock_bh(&obd->obd_processing_task_lock);
370         if (abort_recovery)
371                 target_abort_recovery(obd);
372
373         /* XXX There is a small race between checking the list and adding a
374          * new connection for the same UUID, but the real threat (list
375          * corruption when multiple different clients connect) is solved.
376          *
377          * There is a second race between adding the export to the list,
378          * and filling in the client data below.  Hence skipping the case
379          * of NULL mcd above.  We should already be controlling multiple
380          * connects at the client, and we can't hold the spinlock over
381          * memory allocations without risk of deadlocking.
382          */
383         rc = class_connect(conn, obd, cluuid);
384         if (rc)
385                 RETURN(rc);
386         exp = class_conn2export(conn);
387         LASSERT(exp);
388         med = &exp->exp_mds_data;
389
390         rc = mds_connect_internal(exp, data);
391         if (rc)
392                 GOTO(out, rc);
393
394         OBD_ALLOC(mcd, sizeof(*mcd));
395         if (!mcd)
396                 GOTO(out, rc = -ENOMEM);
397
398         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
399         med->med_mcd = mcd;
400
401         rc = mds_client_add(obd, &obd->u.mds, med, -1);
402         GOTO(out, rc);
403
404 out:
405         if (rc) {
406                 if (mcd) {
407                         OBD_FREE(mcd, sizeof(*mcd));
408                         med->med_mcd = NULL;
409                 }
410                 class_disconnect(exp);
411         } else {
412                 class_export_put(exp);
413         }
414
415         RETURN(rc);
416 }
417
418 static int mds_init_export(struct obd_export *exp)
419 {
420         struct mds_export_data *med = &exp->exp_mds_data;
421
422         INIT_LIST_HEAD(&med->med_open_head);
423         spin_lock_init(&med->med_open_lock);
424         RETURN(0);
425 }
426
427 static int mds_destroy_export(struct obd_export *export)
428 {
429         struct mds_export_data *med;
430         struct obd_device *obd = export->exp_obd;
431         struct lvfs_run_ctxt saved;
432         int rc = 0;
433         ENTRY;
434
435         med = &export->exp_mds_data;
436         target_destroy_export(export);
437
438         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
439                 GOTO(out, 0);
440
441         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
442         /* Close any open files (which may also cause orphan unlinking). */
443         spin_lock(&med->med_open_lock);
444         while (!list_empty(&med->med_open_head)) {
445                 struct list_head *tmp = med->med_open_head.next;
446                 struct mds_file_data *mfd =
447                         list_entry(tmp, struct mds_file_data, mfd_list);
448                 struct dentry *dentry = mfd->mfd_dentry;
449
450                 /* Remove mfd handle so it can't be found again.
451                  * We are consuming the mfd_list reference here. */
452                 mds_mfd_unlink(mfd, 0);
453                 spin_unlock(&med->med_open_lock);
454
455                 /* If you change this message, be sure to update
456                  * replay_single:test_46 */
457                 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
458                        "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len,
459                        dentry->d_name.name, dentry->d_inode->i_ino);
460                 /* child orphan sem protects orphan_dec_test and
461                  * is_orphan race, mds_mfd_close drops it */
462                 MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
463                 rc = mds_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd,
464                                    !(export->exp_flags & OBD_OPT_FAILOVER));
465
466                 if (rc)
467                         CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
468                 spin_lock(&med->med_open_lock);
469         }
470         spin_unlock(&med->med_open_lock);
471         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
472 out:
473         mds_client_free(export);
474
475         RETURN(rc);
476 }
477
478 static int mds_disconnect(struct obd_export *exp)
479 {
480         unsigned long irqflags;
481         int rc;
482         ENTRY;
483
484         LASSERT(exp);
485         class_export_get(exp);
486
487         /* Disconnect early so that clients can't keep using export */
488         rc = class_disconnect(exp);
489         ldlm_cancel_locks_for_export(exp);
490
491         /* complete all outstanding replies */
492         spin_lock_irqsave(&exp->exp_lock, irqflags);
493         while (!list_empty(&exp->exp_outstanding_replies)) {
494                 struct ptlrpc_reply_state *rs =
495                         list_entry(exp->exp_outstanding_replies.next,
496                                    struct ptlrpc_reply_state, rs_exp_list);
497                 struct ptlrpc_service *svc = rs->rs_service;
498
499                 spin_lock(&svc->srv_lock);
500                 list_del_init(&rs->rs_exp_list);
501                 ptlrpc_schedule_difficult_reply(rs);
502                 spin_unlock(&svc->srv_lock);
503         }
504         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
505
506         class_export_put(exp);
507         RETURN(rc);
508 }
509
510 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
511                int *size, int lock)
512 {
513         int rc = 0;
514         int lmm_size;
515
516         if (lock)
517                 down(&inode->i_sem);
518         rc = fsfilt_get_md(obd, inode, md, *size, "lov");
519
520         if (rc < 0) {
521                 CERROR("Error %d reading eadata for ino %lu\n",
522                        rc, inode->i_ino);
523         } else if (rc > 0) {
524                 lmm_size = rc;
525                 rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
526
527                 if (rc == 0) {
528                         *size = lmm_size;
529                         rc = lmm_size;
530                 } else if (rc > 0) {
531                         *size = rc;
532                 }
533         } else {
534                 *size = 0;
535         }
536         if (lock)
537                 up(&inode->i_sem);
538
539         RETURN (rc);
540 }
541
542
543 /* Call with lock=1 if you want mds_pack_md to take the i_sem.
544  * Call with lock=0 if the caller has already taken the i_sem. */
545 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
546                 struct mds_body *body, struct inode *inode, int lock)
547 {
548         struct mds_obd *mds = &obd->u.mds;
549         void *lmm;
550         int lmm_size;
551         int rc;
552         ENTRY;
553
554         lmm = lustre_msg_buf(msg, offset, 0);
555         if (lmm == NULL) {
556                 /* Some problem with getting eadata when I sized the reply
557                  * buffer... */
558                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
559                        inode->i_ino);
560                 RETURN(0);
561         }
562         lmm_size = msg->buflens[offset];
563
564         /* I don't really like this, but it is a sanity check on the client
565          * MD request.  However, if the client doesn't know how much space
566          * to reserve for the MD, it shouldn't be bad to have too much space.
567          */
568         if (lmm_size > mds->mds_max_mdsize) {
569                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
570                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
571                 // RETURN(-EINVAL);
572         }
573
574         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
575         if (rc > 0) {
576                 if (S_ISDIR(inode->i_mode))
577                         body->valid |= OBD_MD_FLDIREA;
578                 else
579                         body->valid |= OBD_MD_FLEASIZE;
580                 body->eadatasize = lmm_size;
581                 rc = 0;
582         }
583
584         RETURN(rc);
585 }
586
587 #ifdef CONFIG_FS_POSIX_ACL
588 static
589 int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg,
590                        struct mds_body *repbody, int repoff)
591 {
592         struct dentry de = { .d_inode = inode };
593         int buflen, rc;
594         ENTRY;
595
596         LASSERT(repbody->aclsize == 0);
597         LASSERT(repmsg->bufcount > repoff);
598
599         buflen = lustre_msg_buflen(repmsg, repoff);
600         if (!buflen)
601                 GOTO(out, 0);
602
603         if (!inode->i_op || !inode->i_op->getxattr)
604                 GOTO(out, 0);
605
606         lock_24kernel();
607         rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS,
608                                    lustre_msg_buf(repmsg, repoff, buflen),
609                                    buflen);
610         unlock_24kernel();
611
612         if (rc >= 0)
613                 repbody->aclsize = rc;
614         else if (rc != -ENODATA) {
615                 CERROR("buflen %d, get acl: %d\n", buflen, rc);
616                 RETURN(rc);
617         }
618         EXIT;
619 out:
620         repbody->valid |= OBD_MD_FLACL;
621         return 0;
622 }
623 #else
624 #define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0
625 #endif
626
627 int mds_pack_acl(struct mds_export_data *med, struct inode *inode,
628                  struct lustre_msg *repmsg, struct mds_body *repbody,
629                  int repoff)
630 {
631         return mds_pack_posix_acl(inode, repmsg, repbody, repoff);
632 }
633
634 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
635                                 struct ptlrpc_request *req,
636                                 struct mds_body *reqbody, int reply_off)
637 {
638         struct mds_body *body;
639         struct inode *inode = dentry->d_inode;
640         int rc = 0;
641         ENTRY;
642
643         if (inode == NULL)
644                 RETURN(-ENOENT);
645
646         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
647         LASSERT(body != NULL);                 /* caller prepped reply */
648
649         mds_pack_inode2fid(&body->fid1, inode);
650         mds_pack_inode2body(body, inode);
651         reply_off++;
652
653         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
654             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
655                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body,
656                                  inode, 1);
657
658                 /* If we have LOV EA data, the OST holds size, atime, mtime */
659                 if (!(body->valid & OBD_MD_FLEASIZE) &&
660                     !(body->valid & OBD_MD_FLDIREA))
661                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
662                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
663
664                 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
665                 if (body->eadatasize)
666                         reply_off++;
667         } else if (S_ISLNK(inode->i_mode) &&
668                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
669                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
670                 int len;
671
672                 LASSERT (symname != NULL);       /* caller prepped reply */
673                 len = req->rq_repmsg->buflens[reply_off];
674
675                 rc = inode->i_op->readlink(dentry, symname, len);
676                 if (rc < 0) {
677                         CERROR("readlink failed: %d\n", rc);
678                 } else if (rc != len - 1) {
679                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
680                                 rc, len - 1);
681                         rc = -EINVAL;
682                 } else {
683                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
684                         body->valid |= OBD_MD_LINKNAME;
685                         body->eadatasize = rc + 1;
686                         symname[rc] = 0;        /* NULL terminate */
687                         rc = 0;
688                 }
689                 reply_off++;
690         }
691
692         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
693                 struct mds_obd *mds = mds_req2mds(req);
694                 body->max_cookiesize = mds->mds_max_cookiesize;
695                 body->max_mdsize = mds->mds_max_mdsize;
696                 body->valid |= OBD_MD_FLMODEASIZE;
697         }
698
699         if (rc)
700                 RETURN(rc);
701
702 #ifdef CONFIG_FS_POSIX_ACL
703         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
704             (reqbody->valid & OBD_MD_FLACL)) {
705                 rc = mds_pack_acl(&req->rq_export->exp_mds_data,
706                                   inode, req->rq_repmsg,
707                                   body, reply_off);
708
709                 lustre_shrink_reply(req, reply_off, body->aclsize, 0);
710                 if (body->aclsize)
711                         reply_off++;
712         }
713 #endif
714
715         RETURN(rc);
716 }
717
718 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
719                                 int offset)
720 {
721         struct mds_obd *mds = mds_req2mds(req);
722         struct mds_body *body;
723         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
724         ENTRY;
725
726         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
727         LASSERT(body != NULL);                 /* checked by caller */
728         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
729
730         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
731             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
732                 down(&inode->i_sem);
733                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
734                                    "lov");
735                 up(&inode->i_sem);
736                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
737                        rc, inode->i_ino);
738                 if (rc < 0) {
739                         if (rc != -ENODATA) {
740                                 CERROR("error getting inode %lu MD: rc = %d\n",
741                                        inode->i_ino, rc);
742                                 RETURN(rc);
743                         }
744                         size[bufcount] = 0;
745                 } else if (rc > mds->mds_max_mdsize) {
746                         size[bufcount] = 0;
747                         CERROR("MD size %d larger than maximum possible %u\n",
748                                rc, mds->mds_max_mdsize);
749                 } else {
750                         size[bufcount] = rc;
751                 }
752                 bufcount++;
753         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
754                 if (inode->i_size + 1 != body->eadatasize)
755                         CERROR("symlink size: %Lu, reply space: %d\n",
756                                inode->i_size + 1, body->eadatasize);
757                 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
758                 bufcount++;
759                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
760                        inode->i_size + 1, body->eadatasize);
761         }
762
763 #ifdef CONFIG_FS_POSIX_ACL
764         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
765             (body->valid & OBD_MD_FLACL)) {
766                 struct dentry de = { .d_inode = inode };
767
768                 size[bufcount] = 0;
769                 if (inode->i_op && inode->i_op->getxattr) {
770                         lock_24kernel();
771                         rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS,
772                                                    NULL, 0);
773                         unlock_24kernel();
774
775                         if (rc < 0) {
776                                 if (rc != -ENODATA) {
777                                         CERROR("got acl size: %d\n", rc);
778                                         RETURN(rc);
779                                 }
780                         } else
781                                 size[bufcount] = rc;
782                 }
783                 bufcount++;
784         }
785 #endif
786
787         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
788                 CERROR("failed MDS_GETATTR_PACK test\n");
789                 req->rq_status = -ENOMEM;
790                 RETURN(-ENOMEM);
791         }
792
793         rc = lustre_pack_reply(req, bufcount, size, NULL);
794         if (rc) {
795                 CERROR("lustre_pack_reply failed: rc %d\n", rc);
796                 req->rq_status = rc;
797                 RETURN(rc);
798         }
799
800         RETURN(0);
801 }
802
803 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
804                             int child_part, struct lustre_handle *child_lockh)
805 {
806         struct obd_device *obd = req->rq_export->exp_obd;
807         struct mds_obd *mds = &obd->u.mds;
808         struct ldlm_reply *rep = NULL;
809         struct lvfs_run_ctxt saved;
810         struct mds_body *body;
811         struct dentry *dparent = NULL, *dchild = NULL;
812         struct lvfs_ucred uc = {NULL,};
813         struct lustre_handle parent_lockh;
814         int namesize;
815         int rc = 0, cleanup_phase = 0, resent_req = 0;
816         char *name;
817         ENTRY;
818
819         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
820
821         /* Swab now, before anyone looks inside the request */
822
823         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
824                                   lustre_swab_mds_body);
825         if (body == NULL) {
826                 CERROR("Can't swab mds_body\n");
827                 RETURN(-EFAULT);
828         }
829
830         LASSERT_REQSWAB(req, offset + 1);
831         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
832         if (name == NULL) {
833                 CERROR("Can't unpack name\n");
834                 RETURN(-EFAULT);
835         }
836         namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
837
838         rc = mds_init_ucred(&uc, req, offset);
839         if (rc)
840                 GOTO(cleanup, rc);
841
842         LASSERT (offset == MDS_REQ_REC_OFF || offset == MDS_REQ_INTENT_REC_OFF);
843         /* if requests were at offset 2, the getattr reply goes back at 1 */
844         if (offset == MDS_REQ_INTENT_REC_OFF) {
845                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
846                 offset = 1;
847         }
848
849         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
850         cleanup_phase = 1; /* kernel context */
851         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
852
853         /* FIXME: handle raw lookup */
854 #if 0
855         if (body->valid == OBD_MD_FLID) {
856                 struct mds_body *mds_reply;
857                 int size = sizeof(*mds_reply);
858                 ino_t inum;
859                 // The user requested ONLY the inode number, so do a raw lookup
860                 rc = lustre_pack_reply(req, 1, &size, NULL);
861                 if (rc) {
862                         CERROR("out of memory\n");
863                         GOTO(cleanup, rc);
864                 }
865
866                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
867
868                 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
869                                            sizeof(*mds_reply));
870                 mds_reply->fid1.id = inum;
871                 mds_reply->valid = OBD_MD_FLID;
872                 GOTO(cleanup, rc);
873         }
874 #endif
875
876         if (lustre_handle_is_used(child_lockh)) {
877                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
878                 resent_req = 1;
879         }
880
881         if (resent_req == 0) {
882             if (name) {
883                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
884                                                  &parent_lockh, &dparent,
885                                                  LCK_CR,
886                                                  MDS_INODELOCK_UPDATE,
887                                                  name, namesize,
888                                                  child_lockh, &dchild, LCK_CR,
889                                                  child_part);
890             } else {
891                         /* For revalidate by fid we always take UPDATE lock */
892                         dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
893                                                        LCK_CR, child_lockh,
894                                                        NULL, 0,
895                                                        MDS_INODELOCK_UPDATE);
896                         LASSERT(dchild);
897                         if (IS_ERR(dchild))
898                                 rc = PTR_ERR(dchild);
899             }
900             if (rc)
901                     GOTO(cleanup, rc);
902         } else {
903                 struct ldlm_lock *granted_lock;
904                 struct ll_fid child_fid;
905                 struct ldlm_resource *res;
906                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
907                 granted_lock = ldlm_handle2lock(child_lockh);
908                 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
909                          body->fid1.id, body->fid1.generation,
910                          child_lockh->cookie);
911
912
913                 res = granted_lock->l_resource;
914                 child_fid.id = res->lr_name.name[0];
915                 child_fid.generation = res->lr_name.name[1];
916                 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
917                 LASSERT(!IS_ERR(dchild));
918                 LDLM_LOCK_PUT(granted_lock);
919         }
920
921         cleanup_phase = 2; /* dchild, dparent, locks */
922
923         if (dchild->d_inode == NULL) {
924                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
925                 /* in the intent case, the policy clears this error:
926                    the disposition is enough */
927                 GOTO(cleanup, rc = -ENOENT);
928         } else {
929                 intent_set_disposition(rep, DISP_LOOKUP_POS);
930         }
931
932         if (req->rq_repmsg == NULL) {
933                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
934                 if (rc != 0) {
935                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
936                         GOTO (cleanup, rc);
937                 }
938         }
939
940         rc = mds_getattr_internal(obd, dchild, req, body, offset);
941         GOTO(cleanup, rc); /* returns the lock to the client */
942
943  cleanup:
944         switch (cleanup_phase) {
945         case 2:
946                 if (resent_req == 0) {
947                         if (rc && dchild->d_inode)
948                                 ldlm_lock_decref(child_lockh, LCK_CR);
949                         ldlm_lock_decref(&parent_lockh, LCK_CR);
950                         l_dput(dparent);
951                 }
952                 l_dput(dchild);
953         case 1:
954                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
955         default:
956                 mds_exit_ucred(&uc, mds);
957                 if (req->rq_reply_state == NULL) {
958                         req->rq_status = rc;
959                         lustre_pack_reply(req, 0, NULL, NULL);
960                 }
961         }
962         return rc;
963 }
964
965 static int mds_getattr(struct ptlrpc_request *req, int offset)
966 {
967         struct mds_obd *mds = mds_req2mds(req);
968         struct obd_device *obd = req->rq_export->exp_obd;
969         struct lvfs_run_ctxt saved;
970         struct dentry *de;
971         struct mds_body *body;
972         struct lvfs_ucred uc = {NULL,};
973         int rc = 0;
974         ENTRY;
975
976         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
977                                   lustre_swab_mds_body);
978         if (body == NULL)
979                 RETURN(-EFAULT);
980
981         rc = mds_init_ucred(&uc, req, offset);
982         if (rc)
983                 GOTO(out_ucred, rc);
984
985         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
986         de = mds_fid2dentry(mds, &body->fid1, NULL);
987         if (IS_ERR(de)) {
988                 rc = req->rq_status = PTR_ERR(de);
989                 GOTO(out_pop, rc);
990         }
991
992         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
993         if (rc != 0) {
994                 CERROR("mds_getattr_pack_msg: %d\n", rc);
995                 GOTO(out_pop, rc);
996         }
997
998         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
999
1000         l_dput(de);
1001         GOTO(out_pop, rc);
1002 out_pop:
1003         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1004 out_ucred:
1005         if (req->rq_reply_state == NULL) {
1006                 req->rq_status = rc;
1007                 lustre_pack_reply(req, 0, NULL, NULL);
1008         }
1009         mds_exit_ucred(&uc, mds);
1010         return rc;
1011 }
1012
1013 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1014                           unsigned long max_age)
1015 {
1016         int rc;
1017
1018         spin_lock(&obd->obd_osfs_lock);
1019         rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
1020         if (rc == 0)
1021                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1022         spin_unlock(&obd->obd_osfs_lock);
1023
1024         return rc;
1025 }
1026
1027 static int mds_statfs(struct ptlrpc_request *req)
1028 {
1029         struct obd_device *obd = req->rq_export->exp_obd;
1030         int rc, size = sizeof(struct obd_statfs);
1031         ENTRY;
1032
1033         /* This will trigger a watchdog timeout */
1034         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
1035                          (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
1036
1037         rc = lustre_pack_reply(req, 1, &size, NULL);
1038         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1039                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
1040                 GOTO(out, rc);
1041         }
1042
1043         /* We call this so that we can cache a bit - 1 jiffie worth */
1044         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
1045                             jiffies - HZ);
1046         if (rc) {
1047                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1048                 GOTO(out, rc);
1049         }
1050
1051         EXIT;
1052 out:
1053         req->rq_status = rc;
1054         return 0;
1055 }
1056
1057 static int mds_sync(struct ptlrpc_request *req, int offset)
1058 {
1059         struct obd_device *obd = req->rq_export->exp_obd;
1060         struct mds_obd *mds = &obd->u.mds;
1061         struct mds_body *body;
1062         int rc, size = sizeof(*body);
1063         ENTRY;
1064
1065         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
1066         if (body == NULL)
1067                 GOTO(out, rc = -EFAULT);
1068
1069         rc = lustre_pack_reply(req, 1, &size, NULL);
1070         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1071                 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1072                 GOTO(out, rc);
1073         }
1074
1075         if (body->fid1.id == 0) {
1076                 /* a fid of zero is taken to mean "sync whole filesystem" */
1077                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
1078                 GOTO(out, rc);
1079         } else {
1080                 struct dentry *de;
1081
1082                 de = mds_fid2dentry(mds, &body->fid1, NULL);
1083                 if (IS_ERR(de))
1084                         GOTO(out, rc = PTR_ERR(de));
1085
1086                 /* The file parameter isn't used for anything */
1087                 if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
1088                         rc = de->d_inode->i_fop->fsync(NULL, de, 1);
1089                 if (rc == 0) {
1090                         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
1091                         mds_pack_inode2fid(&body->fid1, de->d_inode);
1092                         mds_pack_inode2body(body, de->d_inode);
1093                 }
1094
1095                 l_dput(de);
1096                 GOTO(out, rc);
1097         }
1098 out:
1099         req->rq_status = rc;
1100         return 0;
1101 }
1102
1103 /* mds_readpage does not take a DLM lock on the inode, because the client must
1104  * already have a PR lock.
1105  *
1106  * If we were to take another one here, a deadlock will result, if another
1107  * thread is already waiting for a PW lock. */
1108 static int mds_readpage(struct ptlrpc_request *req, int offset)
1109 {
1110         struct obd_device *obd = req->rq_export->exp_obd;
1111         struct mds_obd *mds = &obd->u.mds;
1112         struct vfsmount *mnt;
1113         struct dentry *de;
1114         struct file *file;
1115         struct mds_body *body, *repbody;
1116         struct lvfs_run_ctxt saved;
1117         int rc, size = sizeof(*repbody);
1118         struct lvfs_ucred uc = {NULL,};
1119         ENTRY;
1120
1121         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1122                 RETURN(-ENOMEM);
1123
1124         rc = lustre_pack_reply(req, 1, &size, NULL);
1125         if (rc) {
1126                 CERROR("error packing readpage reply: rc %d\n", rc);
1127                 GOTO(out, rc);
1128         }
1129
1130         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1131                                   lustre_swab_mds_body);
1132         if (body == NULL)
1133                 GOTO (out, rc = -EFAULT);
1134
1135         rc = mds_init_ucred(&uc, req, 0);
1136         if (rc)
1137                 GOTO(out, rc);
1138
1139         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1140         de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1141         if (IS_ERR(de))
1142                 GOTO(out_pop, rc = PTR_ERR(de));
1143
1144         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1145
1146         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1147         /* note: in case of an error, dentry_open puts dentry */
1148         if (IS_ERR(file))
1149                 GOTO(out_pop, rc = PTR_ERR(file));
1150
1151         /* body->size is actually the offset -eeb */
1152         if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
1153                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1154                        body->size, de->d_inode->i_blksize);
1155                 GOTO(out_file, rc = -EFAULT);
1156         }
1157
1158         /* body->nlink is actually the #bytes to read -eeb */
1159         if (body->nlink & (de->d_inode->i_blksize - 1)) {
1160                 CERROR("size %u is not multiple of blocksize %lu\n",
1161                        body->nlink, de->d_inode->i_blksize);
1162                 GOTO(out_file, rc = -EFAULT);
1163         }
1164
1165         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1166         repbody->size = file->f_dentry->d_inode->i_size;
1167         repbody->valid = OBD_MD_FLSIZE;
1168
1169         /* to make this asynchronous make sure that the handling function
1170            doesn't send a reply when this function completes. Instead a
1171            callback function would send the reply */
1172         /* body->size is actually the offset -eeb */
1173         rc = mds_sendpage(req, file, body->size, body->nlink);
1174
1175 out_file:
1176         filp_close(file, 0);
1177 out_pop:
1178         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1179 out:
1180         mds_exit_ucred(&uc, mds);
1181         req->rq_status = rc;
1182         RETURN(0);
1183 }
1184
1185 int mds_reint(struct ptlrpc_request *req, int offset,
1186               struct lustre_handle *lockh)
1187 {
1188         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1189         int rc;
1190
1191         OBD_ALLOC(rec, sizeof(*rec));
1192         if (rec == NULL)
1193                 RETURN(-ENOMEM);
1194
1195         rc = mds_update_unpack(req, offset, rec);
1196         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1197                 CERROR("invalid record\n");
1198                 GOTO(out, req->rq_status = -EINVAL);
1199         }
1200
1201         /* rc will be used to interrupt a for loop over multiple records */
1202         rc = mds_reint_rec(rec, offset, req, lockh);
1203  out:
1204         OBD_FREE(rec, sizeof(*rec));
1205         return rc;
1206 }
1207
1208 static int mds_filter_recovery_request(struct ptlrpc_request *req,
1209                                        struct obd_device *obd, int *process)
1210 {
1211         switch (req->rq_reqmsg->opc) {
1212         case MDS_CONNECT: /* This will never get here, but for completeness. */
1213         case OST_CONNECT: /* This will never get here, but for completeness. */
1214         case MDS_DISCONNECT:
1215         case OST_DISCONNECT:
1216                *process = 1;
1217                RETURN(0);
1218
1219         case MDS_CLOSE:
1220         case MDS_SYNC: /* used in unmounting */
1221         case OBD_PING:
1222         case MDS_REINT:
1223         case LDLM_ENQUEUE:
1224                 *process = target_queue_recovery_request(req, obd);
1225                 RETURN(0);
1226
1227         default:
1228                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1229                 *process = 0;
1230                 /* XXX what should we set rq_status to here? */
1231                 req->rq_status = -EAGAIN;
1232                 RETURN(ptlrpc_error(req));
1233         }
1234 }
1235
1236 static char *reint_names[] = {
1237         [REINT_SETATTR] "setattr",
1238         [REINT_CREATE]  "create",
1239         [REINT_LINK]    "link",
1240         [REINT_UNLINK]  "unlink",
1241         [REINT_RENAME]  "rename",
1242         [REINT_OPEN]    "open",
1243 };
1244
1245 static int mds_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1246 {
1247         char *key;
1248         __u32 *val;
1249         int keylen, rc = 0;
1250         ENTRY;
1251
1252         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
1253         if (key == NULL) {
1254                 DEBUG_REQ(D_HA, req, "no set_info key");
1255                 RETURN(-EFAULT);
1256         }
1257         keylen = req->rq_reqmsg->buflens[0];
1258
1259         val = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*val));
1260         if (val == NULL) {
1261                 DEBUG_REQ(D_HA, req, "no set_info val");
1262                 RETURN(-EFAULT);
1263         }
1264
1265         rc = lustre_pack_reply(req, 0, NULL, NULL);
1266         if (rc)
1267                 RETURN(rc);
1268         req->rq_repmsg->status = 0;
1269
1270         if (keylen < strlen("read-only") ||
1271             memcmp(key, "read-only", keylen) != 0)
1272                 RETURN(-EINVAL);
1273
1274         if (*val)
1275                 exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
1276         else
1277                 exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
1278
1279         RETURN(0);
1280 }
1281
1282 static int mds_handle_quotacheck(struct ptlrpc_request *req)
1283 {
1284         struct obd_quotactl *oqctl;
1285         int rc;
1286         ENTRY;
1287
1288         oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl),
1289                                    lustre_swab_obd_quotactl);
1290         if (oqctl == NULL)
1291                 RETURN(-EPROTO);
1292
1293         rc = lustre_pack_reply(req, 0, NULL, NULL);
1294         if (rc) {
1295                 CERROR("mds: out of memory while packing quotacheck reply\n");
1296                 RETURN(rc);
1297         }
1298
1299         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1300         RETURN(0);
1301 }
1302
1303 static int mds_handle_quotactl(struct ptlrpc_request *req)
1304 {
1305         struct obd_quotactl *oqctl, *repoqc;
1306         int rc, size = sizeof(*repoqc);
1307         ENTRY;
1308
1309         oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl),
1310                                    lustre_swab_obd_quotactl);
1311         if (oqctl == NULL)
1312                 RETURN(-EPROTO);
1313
1314         rc = lustre_pack_reply(req, 1, &size, NULL);
1315         if (rc)
1316                 RETURN(rc);
1317
1318         repoqc = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repoqc));
1319
1320         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1321         *repoqc = *oqctl;
1322         RETURN(0);
1323 }
1324
1325 static int mds_msg_check_version(struct lustre_msg *msg)
1326 {
1327         int rc;
1328
1329         /* TODO: enable the below check while really introducing msg version.
1330          * it's disabled because it will break compatibility with b1_4.
1331          */
1332         return (0);
1333
1334         switch (msg->opc) {
1335         case MDS_CONNECT:
1336         case MDS_DISCONNECT:
1337         case OBD_PING:
1338                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1339                 if (rc)
1340                         CERROR("bad opc %u version %08x, expecting %08x\n",
1341                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
1342                 break;
1343         case MDS_GETSTATUS:
1344         case MDS_GETATTR:
1345         case MDS_GETATTR_NAME:
1346         case MDS_STATFS:
1347         case MDS_READPAGE:
1348         case MDS_REINT:
1349         case MDS_CLOSE:
1350         case MDS_DONE_WRITING:
1351         case MDS_PIN:
1352         case MDS_SYNC:
1353         case MDS_GETXATTR:
1354         case MDS_SETXATTR:
1355         case MDS_SET_INFO:
1356         case MDS_QUOTACHECK:
1357         case MDS_QUOTACTL:
1358         case QUOTA_DQACQ:
1359         case QUOTA_DQREL:
1360                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1361                 if (rc)
1362                         CERROR("bad opc %u version %08x, expecting %08x\n",
1363                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
1364                 break;
1365         case LDLM_ENQUEUE:
1366         case LDLM_CONVERT:
1367         case LDLM_BL_CALLBACK:
1368         case LDLM_CP_CALLBACK:
1369                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1370                 if (rc)
1371                         CERROR("bad opc %u version %08x, expecting %08x\n",
1372                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
1373                 break;
1374         case OBD_LOG_CANCEL:
1375         case LLOG_ORIGIN_HANDLE_CREATE:
1376         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1377         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1378         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1379         case LLOG_ORIGIN_HANDLE_CLOSE:
1380         case LLOG_CATINFO:
1381                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1382                 if (rc)
1383                         CERROR("bad opc %u version %08x, expecting %08x\n",
1384                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
1385                 break;
1386         default:
1387                 CERROR("MDS unknown opcode %d\n", msg->opc);
1388                 rc = -ENOTSUPP;
1389         }
1390         return rc;
1391 }
1392
1393
1394 enum mdt_handler_flags {
1395         /*
1396          * struct mds_body is passed in the 0-th incoming buffer.
1397          */
1398         HABEO_CORPUS = (1 << 0)
1399 };
1400
1401 struct mdt_handler {
1402         const char *mh_name;
1403         int         mh_fail_id;
1404         __u32       mh_opc;
1405         __u32       mh_flags;
1406         int (*mh_act)(struct mdt_thread_info *info,
1407                       struct ptlrpc_request *req, int offset);
1408 };
1409
1410 #define DEF_HNDL(prefix, base, flags, opc, fn)                  \
1411 [prefix ## _ ## opc - prefix ## _ ## base] = {                  \
1412         .mh_name    = #opc,                                     \
1413         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## _NET, \
1414         .mh_opc     = prefix ## _  ## opc,                      \
1415         .mh_flags   = flags,                                    \
1416         .mh_act     = fn                                        \
1417 }
1418
1419 #define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn)
1420
1421 static struct mdt_handler mdt_mds_ops[] = {
1422         DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
1423
1424         DEF_MDT_HNDL(0,            CONNECT,        mds_connect),
1425         DEF_MDT_HNDL(0,            DISCONNECT,     mds_disconnect),
1426         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mds_getattr),
1427         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mds_getattr_name),
1428         DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mds_setxattr),
1429         DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mds_getxattr),
1430         DEF_MDT_HNDL(0,            STATFS,         mds_statfs),
1431         DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mds_readpage),
1432         DEF_MDT_HNDL(0,            REINT,          mds_reint),
1433         DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mds_close),
1434         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mds_done_writing),
1435         DEF_MDT_HNDL(0,            PIN,            mds_pin),
1436         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mds_sync),
1437         DEF_MDT_HNDL(0,            SET_INFO,       mds_set_info),
1438         DEF_MDT_HNDL(0,            QUOTACHECK,     mds_handle_quotacheck),
1439         DEF_MDT_HNDL(0,            QUOTACTL,       mds_handle_quotactl)
1440 };
1441
1442 static struct mdt_handler mdt_obd_ops[] = {
1443 };
1444
1445 static struct mdt_handler mdt_dlm_ops[] = {
1446 };
1447
1448 static struct mdt_handler mdt_llog_ops[] = {
1449 };
1450
1451 static struct mdt_opc_slice {
1452         __u32               mos_opc_start;
1453         int                 mos_opc_end;
1454         struct mdt_handler *mos_hs;
1455 } mdt_handlers[] = {
1456         {
1457                 .mos_opc_start = MDS_GETATTR,
1458                 .mos_opc_end   = MDS_LAST_OPC,
1459                 .mos_hs        = mdt_mds_ops
1460         },
1461         {
1462                 .mos_opc_start = OBD_PING,
1463                 .mos_opc_end   = OBD_LAST_OPC,
1464                 .mos_hs        = mdt_obd_ops
1465         },
1466         {
1467                 .mos_opc_start = LDLM_ENQUEUE,
1468                 .mos_opc_end   = LDLM_LAST_OPC,
1469                 .mos_hs        = mdt_dlm_ops
1470         },
1471         {
1472                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1473                 .mos_opc_end   = LLOG_LAST_OPC,
1474                 .mos_hs        = mdt_llog_ops
1475         }
1476 };
1477
1478 struct mdt_handler *mdt_handler_find(__u32 opc)
1479 {
1480         int i;
1481         struct mdt_opc_slice *s;
1482         struct mdt_handler *h;
1483
1484         h = NULL;
1485         for (i = 0, s = mdt_handlers; i < ARRAY_SIZE(mdt_handlers); i++, s++) {
1486                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1487                         h = s->mos_hs + (opc - s->mos_opc_start);
1488                         if (h->mh_opc != 0)
1489                                 LASSERT(h->mh_opc == opc);
1490                         else
1491                                 h = NULL; /* unsupported opc */
1492                         break;
1493                 }
1494         }
1495         return h;
1496 }
1497
1498 struct mdt_object *mdt_object_find(struct mdt_device *d, struct ll_fid *f)
1499 {
1500         struct lu_object *o;
1501
1502         o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
1503         if (IS_ERR(o))
1504                 return (struct mdt_object *)o;
1505         else
1506                 return container_of(o, struct mdt_object, mot_obj.mo_lu);
1507 }
1508
1509 void mdt_object_put(struct mdt_object *o)
1510 {
1511         lu_object_put(&o->mot_obj.mo_lu);
1512 }
1513
1514 static int mdt_req_handle(struct mdt_thread_info *info,
1515                           struct mdt_handler *h, struct ptlrpc_request *req,
1516                           int shift)
1517 {
1518         int result;
1519         int off;
1520
1521         ENTRY;
1522
1523         LASSERT(h->mh_act != NULL);
1524         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
1525
1526         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1527
1528         if (h->mh_fail_id != 0)
1529                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
1530
1531         off = MDS_REQ_REC_OFF + shift;
1532         result = 0;
1533         if (h->mh_flags & HABEO_CORPUS) {
1534                 info->mti_body = lustre_swab_reqbuf(req, off,
1535                                                     sizeof *info->mti_body,
1536                                                     lustre_swab_mds_body);
1537                 if (info->mti_body == NULL) {
1538                         CERROR("Can't unpack body\n");
1539                         result = req->rq_status = -EFAULT;
1540                 }
1541                 info->mti_object = mdt_object_find(info->mti_mdt,
1542                                                    &info->mti_body->fid1);
1543                 if (IS_ERR(info->mti_object))
1544                         result = PTR_ERR(info->mti_object);
1545         }
1546         if (result == 0)
1547                 result = h->mh_act(info, req, off);
1548         /*
1549          * XXX result value is unconditionally shoved into ->rq_status
1550          * (original code sometimes placed error code into ->rq_status, and
1551          * sometimes returned it to the
1552          * caller). ptlrpc_server_handle_request() doesn't check return value
1553          * anyway.
1554          */
1555         req->rq_status = result;
1556         RETURN(result);
1557 }
1558
1559 static void mdt_thread_info_init(struct mdt_thread_info *info)
1560 {
1561         memset(info, 0, sizeof *info);
1562         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1563         /*
1564          * Poison size array.
1565          */
1566         for (info->mti_rep_buf_nr = 0;
1567              info->mti_rep_buf_nr < MDT_REP_BUF_NR_MAX; info->mti_rep_buf_nr++)
1568                 info->mti_rep_buf_size[info->mti_rep_buf_nr] = ~0;
1569 }
1570
1571 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1572 {
1573         if (info->mti_object != NULL) {
1574                 mdt_object_put(info->mti_object);
1575                 info->mti_object = NULL;
1576         }
1577 }
1578
1579 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
1580 {
1581         int rc;
1582         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1583         struct obd_device *obd = NULL;
1584         struct mdt_handler *h;
1585
1586         ENTRY;
1587
1588         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1589
1590         LASSERT(current->journal_info == NULL);
1591
1592         rc = mds_msg_check_version(req->rq_reqmsg);
1593         if (rc) {
1594                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
1595                 RETURN(rc);
1596         }
1597
1598         /* XXX identical to OST */
1599         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1600                 struct mds_export_data *med;
1601                 int recovering, abort_recovery;
1602
1603                 if (req->rq_export == NULL) {
1604                         CERROR("operation %d on unconnected MDS from %s\n",
1605                                req->rq_reqmsg->opc,
1606                                libcfs_id2str(req->rq_peer));
1607                         req->rq_status = -ENOTCONN;
1608                         GOTO(out, rc = -ENOTCONN);
1609                 }
1610
1611                 med = &req->rq_export->exp_mds_data;
1612                 obd = req->rq_export->exp_obd;
1613                 mds = &obd->u.mds;
1614
1615                 /* sanity check: if the xid matches, the request must
1616                  * be marked as a resent or replayed */
1617                 if (req->rq_xid == med->med_mcd->mcd_last_xid)
1618                         LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
1619                                  (MSG_RESENT | MSG_REPLAY),
1620                                  "rq_xid "LPU64" matches last_xid, "
1621                                  "expected RESENT flag\n",
1622                                  req->rq_xid);
1623                 /* else: note the opposite is not always true; a
1624                  * RESENT req after a failover will usually not match
1625                  * the last_xid, since it was likely never
1626                  * committed. A REPLAYed request will almost never
1627                  * match the last xid, however it could for a
1628                  * committed, but still retained, open. */
1629
1630                 /* Check for aborted recovery. */
1631                 spin_lock_bh(&obd->obd_processing_task_lock);
1632                 abort_recovery = obd->obd_abort_recovery;
1633                 recovering = obd->obd_recovering;
1634                 spin_unlock_bh(&obd->obd_processing_task_lock);
1635                 if (abort_recovery) {
1636                         target_abort_recovery(obd);
1637                 } else if (recovering) {
1638                         int should_process;
1639
1640                         rc = mds_filter_recovery_request(req, obd,
1641                                                          &should_process);
1642                         if (rc || !should_process)
1643                                 RETURN(rc);
1644                 }
1645         }
1646
1647         h = mdt_handler_find(req->rq_reqmsg->opc);
1648         if (h != NULL) {
1649                 rc = mdt_req_handle(info, h, req, 0);
1650         } else {
1651                 req->rq_status = -ENOTSUPP;
1652                 rc = ptlrpc_error(req);
1653                 RETURN(rc);
1654         }
1655
1656         LASSERT(current->journal_info == NULL);
1657
1658         /* If we're DISCONNECTing, the mds_export_data is already freed */
1659         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1660                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1661                 req->rq_repmsg->last_xid =
1662                         le64_to_cpu(med->med_mcd->mcd_last_xid);
1663
1664                 target_committed_to_req(req);
1665         }
1666
1667         EXIT;
1668  out:
1669
1670         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1671                 if (obd && obd->obd_recovering) {
1672                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1673                         RETURN(target_queue_final_reply(req, rc));
1674                 }
1675                 /* Lost a race with recovery; let the error path DTRT. */
1676                 rc = req->rq_status = -ENOTCONN;
1677         }
1678
1679         target_send_reply(req, rc, info->mti_fail_id);
1680         RETURN(0);
1681 }
1682
1683 static struct lu_device_operations mdt_lu_ops;
1684
1685 static int lu_device_is_mdt(struct lu_device *d)
1686 {
1687         /*
1688          * XXX for now. Tags in lu_device_type->ldt_something are needed.
1689          */
1690         return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
1691 }
1692
1693 static struct mdt_object *mdt_obj(struct lu_object *o)
1694 {
1695         LASSERT(lu_device_is_mdt(o->lo_dev));
1696         return container_of(o, struct mdt_object, mot_obj.mo_lu);
1697 }
1698
1699 static struct mdt_device *mdt_dev(struct lu_device *d)
1700 {
1701         LASSERT(lu_device_is_mdt(d));
1702         return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
1703 }
1704
1705 int mdt_handle(struct ptlrpc_request *req)
1706 {
1707         int result;
1708
1709         struct mdt_thread_info info; /* XXX on stack for now */
1710         mdt_thread_info_init(&info);
1711         info.mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1712
1713         result = mdt_handle0(req, &info);
1714
1715         mdt_thread_info_fini(&info);
1716         return result;
1717 }
1718
1719 static int mdt_intent_policy(struct ldlm_namespace *ns,
1720                              struct ldlm_lock **lockp, void *req_cookie,
1721                              ldlm_mode_t mode, int flags, void *data)
1722 {
1723         RETURN(ELDLM_LOCK_ABORTED);
1724 }
1725
1726 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
1727                                             svc_handler_t h, char *name,
1728                                             struct proc_dir_entry *proc_entry,
1729                                             svcreq_printfn_t prntfn)
1730 {
1731         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
1732                                c->psc_max_req_size, c->psc_max_reply_size,
1733                                c->psc_req_portal, c->psc_rep_portal,
1734                                c->psc_watchdog_timeout,
1735                                h, name, proc_entry,
1736                                prntfn, c->psc_num_threads);
1737 }
1738
1739 int md_device_init(struct md_device *md, struct lu_device_type *t)
1740 {
1741         return lu_device_init(&md->md_lu_dev, t);
1742 }
1743
1744 void md_device_fini(struct md_device *md)
1745 {
1746         lu_device_fini(&md->md_lu_dev);
1747 }
1748
1749 static void mdt_fini(struct lu_device *d)
1750 {
1751         struct mdt_device *m = mdt_dev(d);
1752
1753         if (d->ld_site != NULL) {
1754                 lu_site_fini(d->ld_site);
1755                 d->ld_site = NULL;
1756         }
1757         if (m->mdt_service != NULL) {
1758                 ptlrpc_unregister_service(m->mdt_service);
1759                 m->mdt_service = NULL;
1760         }
1761         if (m->mdt_namespace != NULL) {
1762                 ldlm_namespace_free(m->mdt_namespace, 0);
1763                 m->mdt_namespace = NULL;
1764         }
1765         
1766         LASSERT(atomic_read(&d->ld_ref) == 0);
1767         md_device_fini(&m->mdt_md_dev);
1768 }
1769
1770 static int mdt_init0(struct mdt_device *m,
1771                      struct lu_device_type *t, struct lustre_cfg *cfg)
1772 {
1773         struct lu_site *s;
1774         char   ns_name[48];
1775
1776         ENTRY;
1777
1778         OBD_ALLOC_PTR(s);
1779         if (s == NULL)
1780                 return -ENOMEM;
1781
1782         md_device_init(&m->mdt_md_dev, t);
1783
1784         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1785
1786         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
1787         m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
1788         m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
1789         m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
1790         m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
1791         m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
1792         m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1793         /*
1794          * We'd like to have a mechanism to set this on a per-device basis,
1795          * but alas...
1796          */
1797         m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
1798                                                       MDT_MIN_THREADS),
1799                                                   MDT_MAX_THREADS);
1800         lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1801
1802         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1803         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1804         if (m->mdt_namespace == NULL)
1805                 return -ENOMEM;
1806         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1807
1808         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1809                            "mdt_ldlm_client", &m->mdt_ldlm_client);
1810
1811         m->mdt_service =
1812                 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1813                                      LUSTRE_MDT0_NAME,
1814                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1815                                      NULL);
1816         if (m->mdt_service == NULL)
1817                 return -ENOMEM;
1818
1819         return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1820 }
1821
1822 struct lu_object *mdt_object_alloc(struct lu_device *d)
1823 {
1824         struct mdt_object *mo;
1825
1826         OBD_ALLOC_PTR(mo);
1827         if (mo != NULL) {
1828                 struct lu_object *o;
1829                 struct lu_object_header *h;
1830
1831                 o = &mo->mot_obj.mo_lu;
1832                 h = &mo->mot_header;
1833                 lu_object_header_init(h);
1834                 lu_object_init(o, h, d);
1835                 /* ->lo_depth and ->lo_flags are automatically 0 */
1836                 lu_object_add_top(h, o);
1837                 return o;
1838         } else
1839                 return NULL;
1840 }
1841
1842 int mdt_object_init(struct lu_object *o)
1843 {
1844         struct mdt_device *d = mdt_dev(o->lo_dev);
1845         struct lu_device  *under;
1846         struct lu_object  *below;
1847
1848         under = &d->mdt_child->md_lu_dev;
1849         below = under->ld_ops->ldo_object_alloc(under);
1850         if (below != NULL) {
1851                 lu_object_add(o, below);
1852                 return 0;
1853         } else
1854                 return -ENOMEM;
1855 }
1856
1857 void mdt_object_free(struct lu_object *o)
1858 {
1859         struct lu_object_header *h;
1860
1861         h = o->lo_header;
1862         lu_object_fini(o);
1863         lu_object_header_fini(h);
1864 }
1865
1866 void mdt_object_release(struct lu_object *o)
1867 {
1868 }
1869
1870 int mdt_object_print(struct seq_file *f, const struct lu_object *o)
1871 {
1872         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1873 }
1874
1875 static struct lu_device_operations mdt_lu_ops = {
1876         .ldo_object_alloc   = mdt_object_alloc,
1877         .ldo_object_init    = mdt_object_init,
1878         .ldo_object_free    = mdt_object_free,
1879         .ldo_object_release = mdt_object_release,
1880         .ldo_object_print   = mdt_object_print
1881 };
1882
1883 static struct ll_fid *mdt_object_fid(struct mdt_object *o)
1884 {
1885         return lu_object_fid(&o->mot_obj.mo_lu);
1886 }
1887
1888 static int mdt_object_lock(struct mdt_object *o, ldlm_mode_t mode)
1889 {
1890         return fid_lock(mdt_object_fid(o), &o->mot_lh, mode);
1891 }
1892
1893 static void mdt_object_unlock(struct mdt_object *o, ldlm_mode_t mode)
1894 {
1895         fid_unlock(mdt_object_fid(o), &o->mot_lh, mode);
1896 }
1897
1898 int mdt_mkdir(struct mdt_device *d, struct ll_fid *pfid, const char *name)
1899 {
1900         struct mdt_object *o;
1901         int result;
1902
1903         o = mdt_object_find(d, pfid);
1904         if (IS_ERR(o))
1905                 return PTR_ERR(o);
1906         result = mdt_object_lock(o, LCK_PW);
1907         if (result == 0) {
1908                 result = d->mdt_child->md_ops->mdo_mkdir(&o->mot_obj, name);
1909                 mdt_object_unlock(o, LCK_PW);
1910         }
1911         mdt_object_put(o);
1912         return result;
1913 }
1914
1915 static struct obd_ops mdt_obd_device_ops = {
1916         .o_owner           = THIS_MODULE
1917 };
1918
1919 struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1920                                    struct lustre_cfg *cfg)
1921 {
1922         struct lu_device  *l;
1923         struct mdt_device *m;
1924
1925         OBD_ALLOC_PTR(m);
1926         if (m != NULL) {
1927                 int result;
1928
1929                 l = &m->mdt_md_dev.md_lu_dev;
1930                 result = mdt_init0(m, t, cfg);
1931                 if (result != 0) {
1932                         mdt_fini(l);
1933                         m = ERR_PTR(result);
1934                 }
1935         } else
1936                 l = ERR_PTR(-ENOMEM);
1937         return l;
1938 }
1939
1940 void mdt_device_free(struct lu_device *m)
1941 {
1942         mdt_fini(m);
1943         OBD_FREE_PTR(m);
1944 }
1945
1946 int mdt_type_init(struct lu_device_type *t)
1947 {
1948         return 0;
1949 }
1950
1951 void mdt_type_fini(struct lu_device_type *t)
1952 {
1953 }
1954
1955 static struct lu_device_type_operations mdt_device_type_ops = {
1956         .ldto_init = mdt_type_init,
1957         .ldto_fini = mdt_type_fini,
1958
1959         .ldto_device_alloc = mdt_device_alloc,
1960         .ldto_device_free  = mdt_device_free
1961 };
1962
1963 static struct lu_device_type mdt_device_type = {
1964         .ldt_name = LUSTRE_MDT0_NAME,
1965         .ldt_ops  = &mdt_device_type_ops
1966 };
1967
1968 static int __init mdt_mod_init(void)
1969 {
1970         struct lprocfs_static_vars lvars;
1971         struct obd_type *type;
1972         int result;
1973
1974         mdt_num_threads = MDT_NUM_THREADS;
1975         lprocfs_init_vars(mdt, &lvars);
1976         result = class_register_type(&mdt_obd_device_ops,
1977                                      lvars.module_vars, LUSTRE_MDT0_NAME);
1978         if (result == 0) {
1979                 type = class_get_type(LUSTRE_MDT0_NAME);
1980                 LASSERT(type != NULL);
1981                 type->typ_lu = &mdt_device_type;
1982                 result = type->typ_lu->ldt_ops->ldto_init(type->typ_lu);
1983                 if (result != 0)
1984                         class_unregister_type(LUSTRE_MDT0_NAME);
1985         }
1986         return result;
1987 }
1988
1989 static void __exit mdt_mod_exit(void)
1990 {
1991         class_unregister_type(LUSTRE_MDT0_NAME);
1992 }
1993
1994 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1995 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1996 MODULE_LICENSE("GPL");
1997
1998 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1999                 "number of mdt service threads to start");
2000
2001 cfs_module(mdt, "0.0.2", mdt_mod_init, mdt_mod_exit);