Whamcloud - gitweb
- introduce the struct md_create_spec. It contains the various type-depended
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mdt/mdt_handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *   Author: Huang Hua <huanghua@clusterfs.com>
14  *
15  *   This file is part of the Lustre file system, http://www.lustre.org
16  *   Lustre is a trademark of Cluster File Systems, Inc.
17  *
18  *   You may have signed or agreed to another license before downloading
19  *   this software.  If so, you are bound by the terms and conditions
20  *   of that agreement, and the following does not apply to you.  See the
21  *   LICENSE file included with this distribution for more information.
22  *
23  *   If you did not agree to a different license, then this copy of Lustre
24  *   is open source software; you can redistribute it and/or modify it
25  *   under the terms of version 2 of the GNU General Public License as
26  *   published by the Free Software Foundation.
27  *
28  *   In either case, Lustre is distributed in the hope that it will be
29  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31  *   license text for more details.
32  */
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37 #define DEBUG_SUBSYSTEM S_MDS
38
39 #include <linux/module.h>
40
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
43 /*
44  * struct OBD_{ALLOC,FREE}*()
45  * MDT_FAIL_CHECK
46  */
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
53 #include <obd.h>
54 /* lu2dt_dev() */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include "mdt_internal.h"
58 #include <linux/lustre_acl.h>
59 /*
60  * Initialized in mdt_mod_init().
61  */
62 unsigned long mdt_num_threads;
63
64 /* ptlrpc request handler for MDT. All handlers are
65  * grouped into several slices - struct mdt_opc_slice,
66  * and stored in an array - mdt_handlers[].
67  */
68 struct mdt_handler {
69         /* The name of this handler. */
70         const char *mh_name;
71         /* Fail id for this handler, checked at the beginning of this handler.*/
72         int         mh_fail_id;
73         /* Operation code for this handler */
74         __u32       mh_opc;
75         /* flags are listed in enum mdt_handler_flags below. */
76         __u32       mh_flags;
77         /* The actual handler function to execute. */
78         int (*mh_act)(struct mdt_thread_info *info);
79         /* Request format for this request. */
80         const struct req_format *mh_fmt;
81 };
82
83 enum mdt_handler_flags {
84         /*
85          * struct mdt_body is passed in the incoming message, and object
86          * identified by this fid exists on disk.
87          *
88          * "habeo corpus" == "I have a body"
89          */
90         HABEO_CORPUS = (1 << 0),
91         /*
92          * struct ldlm_request is passed in the incoming message.
93          *
94          * "habeo clavis" == "I have a key"
95          */
96         HABEO_CLAVIS = (1 << 1),
97         /*
98          * this request has fixed reply format, so that reply message can be
99          * packed by generic code.
100          *
101          * "habeo refero" == "I have a reply"
102          */
103         HABEO_REFERO = (1 << 2),
104         /*
105          * this request will modify something, so check whether the filesystem
106          * is readonly or not, then return -EROFS to client asap if necessary.
107          *
108          * "mutabor" == "I shall modify"
109          */
110         MUTABOR      = (1 << 3)
111 };
112
113 struct mdt_opc_slice {
114         __u32               mos_opc_start;
115         int                 mos_opc_end;
116         struct mdt_handler *mos_hs;
117 };
118
119 static struct mdt_opc_slice mdt_handlers[];
120 static struct mdt_opc_slice mdt_readpage_handlers[];
121
122 static int                    mdt_handle    (struct ptlrpc_request *req);
123 static struct mdt_device     *mdt_dev       (struct lu_device *d);
124 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
125
126 static struct lu_object_operations mdt_obj_ops;
127
128
129 static int mdt_getstatus(struct mdt_thread_info *info)
130 {
131         struct md_device *next  = info->mti_mdt->mdt_child;
132         int               result;
133         struct mdt_body  *body;
134
135         ENTRY;
136
137         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
138                 result = -ENOMEM;
139         else {
140                 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
141                 result = next->md_ops->mdo_root_get(info->mti_ctxt,
142                                                     next, &body->fid1);
143                 if (result == 0)
144                         body->valid |= OBD_MD_FLID;
145         }
146
147         RETURN(result);
148 }
149
150 static int mdt_statfs(struct mdt_thread_info *info)
151 {
152         struct md_device  *next  = info->mti_mdt->mdt_child;
153         struct obd_statfs *osfs;
154         int                result;
155
156         ENTRY;
157
158         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
159                 result = -ENOMEM;
160         } else {
161                 osfs = req_capsule_server_get(&info->mti_pill, &RMF_OBD_STATFS);
162                 /* XXX max_age optimisation is needed here. See mds_statfs */
163                 result = next->md_ops->mdo_statfs(info->mti_ctxt,
164                                                   next, &info->mti_sfs);
165                 statfs_pack(osfs, &info->mti_sfs);
166         }
167
168         RETURN(result);
169 }
170
171 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
172                         const struct lu_fid *fid)
173 {
174         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID |
175                     OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
176                     OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
177
178         if (!S_ISREG(attr->la_mode))
179                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
180                             OBD_MD_FLMTIME;
181
182         b->atime      = attr->la_atime;
183         b->mtime      = attr->la_mtime;
184         b->ctime      = attr->la_ctime;
185         b->mode       = attr->la_mode;
186         b->size       = attr->la_size;
187         b->blocks     = attr->la_blocks;
188         b->uid        = attr->la_uid;
189         b->gid        = attr->la_gid;
190         b->flags      = attr->la_flags;
191         b->nlink      = attr->la_nlink;
192
193         if (fid) {
194                 b->fid1 = *fid;
195                 b->valid |= OBD_MD_FLID;
196                 CDEBUG(D_INODE, ""DFID3": nlink=%d, mode=%o, size="LPU64"\n",
197                                 PFID3(fid), b->nlink, b->mode, b->size);
198         }
199 }
200
201 static inline int mdt_body_has_lov(const struct lu_attr *la,
202                                    const struct mdt_body *body)
203 {
204         return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
205                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
206 }
207
208 static int mdt_getattr_internal(struct mdt_thread_info *info,
209                                 struct mdt_object *o, int offset)
210 {
211         struct md_object        *next = mdt_object_child(o);
212         const struct mdt_body   *reqbody = info->mti_body;
213         struct ptlrpc_request   *req = mdt_info_req(info);
214         struct md_attr          *ma = &info->mti_attr;
215         struct lu_attr          *la = &ma->ma_attr;
216         struct req_capsule      *pill = &info->mti_pill;
217         const struct lu_context *ctxt = info->mti_ctxt;
218         struct mdt_body         *repbody;
219         void                    *buffer;
220         int                     length;
221         int                     rc;
222         ENTRY;
223
224         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
225                 RETURN(-ENOMEM);
226         }
227         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
228         repbody->eadatasize = 0;
229         repbody->aclsize = 0;
230
231         ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
232         ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
233
234         rc = mo_attr_get(ctxt, next, &info->mti_attr);
235         if (rc == -EREMOTE) {
236                 /* This object is located on remote node.*/
237                 repbody->fid1 = *mdt_object_fid(o);
238                 repbody->valid |= OBD_MD_FLID;
239                 GOTO(shrink, rc = 0);
240         } else if (rc){
241                 CERROR("getattr error for "DFID3": %d\n",
242                         PFID3(mdt_object_fid(o)), rc);
243                 RETURN(rc);
244         }
245
246         if (ma->ma_valid & MA_INODE)
247                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
248
249         if (mdt_body_has_lov(la, reqbody)) {
250                 if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
251                         CDEBUG(D_INODE, "packing ea for "DFID3"\n",
252                                         PFID3(mdt_object_fid(o)));
253                         mdt_dump_lmm(D_INFO, ma->ma_lmm);
254                         repbody->eadatasize = ma->ma_lmm_size;
255                         repbody->valid |= OBD_MD_FLEASIZE;
256                 }
257         } else if (S_ISLNK(la->la_mode) &&
258                           reqbody->valid & OBD_MD_LINKNAME) {
259                 rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
260                 if (rc <= 0) {
261                         CERROR("readlink failed: %d\n", rc);
262                         rc = -EFAULT;
263                 } else {
264                         repbody->valid |= OBD_MD_LINKNAME;
265                         repbody->eadatasize = rc + 1;
266                         ((char*)ma->ma_lmm)[rc] = 0;        /* NULL terminate */
267                         CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
268                                         (char*)buffer, rc);
269                         rc = 0;
270                 }
271         }
272
273         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
274                 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
275                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
276                 repbody->valid |= OBD_MD_FLMODEASIZE;
277                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & MAX_COOKIE"
278                                 " to : %d:%d\n",
279                                 repbody->max_mdsize,
280                                 repbody->max_cookiesize);
281         }
282
283 #ifdef CONFIG_FS_POSIX_ACL
284         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
285             (reqbody->valid & OBD_MD_FLACL)) {
286                 buffer = req_capsule_server_get(pill, &RMF_ACL);
287                 length = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
288                 if (length > 0) {
289                         rc = mo_xattr_get(ctxt, next, buffer,
290                                           length, XATTR_NAME_ACL_ACCESS);
291                         if (rc < 0) {
292                                 if (rc == -ENODATA || rc == -EOPNOTSUPP)
293                                         rc = 0;
294                                 else
295                                         CERROR("got acl size: %d\n", rc);
296                         } else {
297                                 repbody->aclsize = rc;
298                                 repbody->valid |= OBD_MD_FLACL;
299                         }
300                 }
301         }
302 #endif
303
304 shrink:
305         /* FIXME: determine the offset of MDT_MD. but it does not work */
306 /*
307         if (req_capsule_has_field(pill, &RMF_DLM_REP)) {
308                 offset = 2;
309         } else
310                 offset = 1;
311 */
312         lustre_shrink_reply(req, offset, repbody->eadatasize, 1);
313         if (repbody->eadatasize)
314                 offset ++;
315         lustre_shrink_reply(req, offset, repbody->aclsize, 0);
316         RETURN(rc);
317 }
318
319 static int mdt_getattr(struct mdt_thread_info *info)
320 {
321         int result;
322
323         LASSERT(info->mti_object != NULL);
324         LASSERT(lu_object_assert_exists(info->mti_ctxt,
325                                         &info->mti_object->mot_obj.mo_lu));
326         ENTRY;
327
328
329         req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
330                              RCL_SERVER, info->mti_mdt->mdt_max_mdsize);
331
332         result = req_capsule_pack(&info->mti_pill);
333         if (result)
334                 RETURN(result);
335
336         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
337                 result = -ENOMEM;
338         } else {
339                 result = mdt_getattr_internal(info, info->mti_object, 1);
340         }
341         RETURN(result);
342 }
343
344 /*
345  * UPDATE lock should be taken against parent, and be release before exit;
346  * child_bits lock should be taken against child, and be returned back:
347  *            (1)normal request should release the child lock;
348  *            (2)intent request will grant the lock to client.
349  */
350 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
351                                  struct mdt_lock_handle *lhc,
352                                  __u64 child_bits,
353                                  struct ldlm_reply *ldlm_rep)
354 {
355         struct mdt_object *parent = info->mti_object;
356         struct mdt_object *child;
357         struct md_object  *next = mdt_object_child(info->mti_object);
358         struct lu_fid     *child_fid = &info->mti_tmp_fid1;
359         const char        *name;
360         int               result;
361         struct mdt_lock_handle *lhp;
362         ENTRY;
363
364         LASSERT(info->mti_object != NULL);
365         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
366         if (name == NULL)
367                 RETURN(-EFAULT);
368
369         CDEBUG(D_INODE, "getattr with lock for "DFID3"/%s, ldlm_rep = %p\n",
370                         PFID3(mdt_object_fid(parent)), name, ldlm_rep);
371
372         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
373         if (strlen(name) == 0) {
374                 /* only getattr on the child. parent is on another node. */
375                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
376                 child = parent;
377                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID3
378                                ", ldlm_rep=%p\n",
379                                PFID3(mdt_object_fid(child)), ldlm_rep);
380
381                 mdt_lock_handle_init(lhc);
382                 lhc->mlh_mode = LCK_CR;
383                 result = mdt_object_lock(info, child, lhc, child_bits);
384                 if (result != 0) {
385                         /* finally, we can get attr for child. */
386                         result = mdt_getattr_internal(info, child,
387                                                       ldlm_rep ? 2 : 1);
388                         if (result != 0)
389                                 mdt_object_unlock(info, child, lhc, 1);
390                 }
391                 GOTO(out, result);
392         }
393
394         /*step 1: lock parent */
395         lhp = &info->mti_lh[MDT_LH_PARENT];
396         lhp->mlh_mode = LCK_CR;
397         result = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
398         if (result != 0)
399                 RETURN(result);
400
401         /*step 2: lookup child's fid by name */
402         result = mdo_lookup(info->mti_ctxt, next, name, child_fid);
403         if (result != 0) {
404                 if (result == -ENOENT)
405                         intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
406                 GOTO(out_parent, result);
407         } else
408                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
409         /*
410          *step 3: find the child object by fid & lock it.
411          *        regardless if it is local or remote.
412          */
413         mdt_lock_handle_init(lhc);
414         lhc->mlh_mode = LCK_CR;
415         child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
416         if (IS_ERR(child))
417                 GOTO(out_parent, result = PTR_ERR(child));
418
419         /* finally, we can get attr for child. */
420         result = mdt_getattr_internal(info, child,
421                                       ldlm_rep ? 2 : 1);
422         if (result != 0)
423                 mdt_object_unlock(info, child, lhc, 1);
424         else {
425                 struct ldlm_lock *lock;
426                 struct ldlm_res_id *res_id;
427                 lock = ldlm_handle2lock(&lhc->mlh_lh);
428                 if (lock) {
429                         res_id = &lock->l_resource->lr_name;
430                         LDLM_DEBUG(lock, "we will return this lock client\n");
431                         LASSERTF(fid_res_name_eq(mdt_object_fid(child),
432                                                 &lock->l_resource->lr_name),
433                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID3".\n",
434                          (unsigned long)res_id->name[0],
435                          (unsigned long)res_id->name[1],
436                          (unsigned long)res_id->name[2],
437                          PFID3(mdt_object_fid(child)));
438                         LDLM_LOCK_PUT(lock);
439                 }
440
441         }
442         mdt_object_put(info->mti_ctxt, child);
443
444         EXIT;
445 out_parent:
446         mdt_object_unlock(info, parent, lhp, 1);
447 out:
448         return result;
449 }
450
451 /* normal handler: should release the child lock */
452 static int mdt_getattr_name(struct mdt_thread_info *info)
453 {
454         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
455         int rc;
456
457         ENTRY;
458
459         req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
460                              RCL_SERVER, info->mti_mdt->mdt_max_mdsize);
461
462         rc = req_capsule_pack(&info->mti_pill);
463         if (rc)
464                 RETURN(rc);
465         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
466         if (lustre_handle_is_used(&lhc->mlh_lh)) {
467                 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
468                 lhc->mlh_lh.cookie = 0;
469         }
470         RETURN(rc);
471 }
472
473 static struct lu_device_operations mdt_lu_ops;
474
475 static int lu_device_is_mdt(struct lu_device *d)
476 {
477         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
478 }
479
480 static struct mdt_device *mdt_dev(struct lu_device *d)
481 {
482         LASSERT(lu_device_is_mdt(d));
483         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
484 }
485
486 static int mdt_connect(struct mdt_thread_info *info)
487 {
488         int result;
489         struct ptlrpc_request *req;
490
491         req = mdt_info_req(info);
492         result = target_handle_connect(req, mdt_handle);
493         if (result == 0) {
494                 LASSERT(req->rq_export != NULL);
495                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
496         }
497         return result;
498 }
499
500 static int mdt_disconnect(struct mdt_thread_info *info)
501 {
502         return target_handle_disconnect(mdt_info_req(info));
503 }
504
505 static int mdt_sendpage(struct mdt_thread_info *info,
506                         struct lu_rdpg *rdpg)
507 {
508         struct ptlrpc_request   *req = mdt_info_req(info);
509         struct ptlrpc_bulk_desc *desc;
510         struct l_wait_info       lwi;
511         int                      tmpcount;
512         int                      tmpsize;
513         int                      i;
514         int                      rc;
515         ENTRY;
516
517         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
518                                     MDS_BULK_PORTAL);
519         if (desc == NULL)
520                 GOTO(out, rc = -ENOMEM);
521
522         for (i = 0, tmpcount = rdpg->rp_count;
523                 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
524                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
525                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
526         }
527
528         LASSERT(desc->bd_nob == rdpg->rp_count);
529         rc = ptlrpc_start_bulk_transfer(desc);
530         if (rc)
531                 GOTO(free_desc, rc);
532
533         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
534                 GOTO(abort_bulk, rc);
535
536         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
537         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
538         LASSERT (rc == 0 || rc == -ETIMEDOUT);
539
540         if (rc == 0) {
541                 if (desc->bd_success &&
542                     desc->bd_nob_transferred == rdpg->rp_count)
543                         GOTO(free_desc, rc);
544
545                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
546         }
547
548         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
549                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
550                   desc->bd_nob_transferred, rdpg->rp_count,
551                   req->rq_export->exp_client_uuid.uuid,
552                   req->rq_export->exp_connection->c_remote_uuid.uuid);
553
554         class_fail_export(req->rq_export);
555
556         EXIT;
557 abort_bulk:
558         ptlrpc_abort_bulk(desc);
559 free_desc:
560         ptlrpc_free_bulk(desc);
561 out:
562         return rc;
563 }
564
565 static int mdt_readpage(struct mdt_thread_info *info)
566 {
567         struct mdt_object *object = info->mti_object;
568         struct lu_rdpg    *rdpg = &info->mti_rdpg;
569         struct mdt_body   *reqbody;
570         struct mdt_body   *repbody;
571         int                rc;
572         int                i;
573         ENTRY;
574
575         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
576                 RETURN(-ENOMEM);
577
578         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
579         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
580         if (reqbody == NULL || repbody == NULL)
581                 RETURN(-EFAULT);
582
583         /*
584          * prepare @rdpg before calling lower layers and transfer itself. Here
585          * reqbody->size contains offset of where to start to read and
586          * reqbody->nlink contains number bytes to read.
587          */
588         rdpg->rp_hash = reqbody->size;
589         if ((__u64)rdpg->rp_hash != reqbody->size) {
590                 CERROR("Invalid hash: %#llx != %#llx\n",
591                        (__u64)rdpg->rp_hash, reqbody->size);
592                 RETURN(-EFAULT);
593         }
594         rdpg->rp_count  = reqbody->nlink;
595         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>> CFS_PAGE_SHIFT;
596         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
597         if (rdpg->rp_pages == NULL)
598                 RETURN(-ENOMEM);
599
600         for (i = 0; i < rdpg->rp_npages; ++i) {
601                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
602                 if (rdpg->rp_pages[i] == NULL)
603                         GOTO(free_rdpg, rc = -ENOMEM);
604         }
605
606         /* call lower layers to fill allocated pages with directory data */
607         rc = mo_readpage(info->mti_ctxt, mdt_object_child(object), rdpg);
608         if (rc)
609                 GOTO(free_rdpg, rc);
610
611         /* send pages to client */
612         rc = mdt_sendpage(info, rdpg);
613
614         EXIT;
615 free_rdpg:
616         for (i = 0; i < rdpg->rp_npages; i++)
617                 if (rdpg->rp_pages[i] != NULL)
618                         __free_pages(rdpg->rp_pages[i], 0);
619         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
620         return rc;
621 }
622
623 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
624 {
625         int rc;
626         ENTRY;
627
628         OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_UNPACK, -EFAULT);
629
630         rc = mdt_reint_unpack(info, op);
631         if (rc == 0) {
632                 rc = mdt_reint_rec(info);
633         }
634
635         RETURN(rc);
636 }
637
638 static long mdt_reint_opcode(struct mdt_thread_info *info,
639                              const struct req_format **fmt)
640 {
641         __u32 *ptr;
642         long opc;
643
644         opc = -EFAULT;
645         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
646         if (ptr != NULL) {
647                 opc = *ptr;
648                 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
649                 if (opc < REINT_MAX && fmt[opc] != NULL)
650                         req_capsule_extend(&info->mti_pill, fmt[opc]);
651                 else
652                         CERROR("Unsupported opc: %ld\n", opc);
653         }
654         return opc;
655 }
656
657 static int mdt_reint(struct mdt_thread_info *info)
658 {
659         long opc;
660         int  rc;
661
662         static const struct req_format *reint_fmts[REINT_MAX] = {
663                 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
664                 [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
665                 [REINT_LINK]    = &RQF_MDS_REINT_LINK,
666                 [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
667                 [REINT_RENAME]  = &RQF_MDS_REINT_RENAME,
668                 [REINT_OPEN]    = &RQF_MDS_REINT_OPEN
669         };
670
671         ENTRY;
672
673         opc = mdt_reint_opcode(info, reint_fmts);
674         if (opc >= 0) {
675                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
676
677                 rc = mdt_reint_internal(info, opc);
678
679         } else
680                 rc = opc;
681         RETURN(rc);
682 }
683
684 /* TODO these two methods not available now. */
685
686 /* this should sync the whole device */
687 static int mdt_device_sync(struct mdt_thread_info *info)
688 {
689         return 0;
690 }
691
692 /* this should sync this object */
693 static int mdt_object_sync(struct mdt_thread_info *info)
694 {
695         return 0;
696 }
697
698 static int mdt_sync(struct mdt_thread_info *info)
699 {
700         struct req_capsule *pill = &info->mti_pill;
701         struct mdt_body *body;
702         int rc;
703         ENTRY;
704
705         /* The fid may be zero, so we req_capsule_set manually */
706         req_capsule_set(pill, &RQF_MDS_SYNC);
707
708         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
709         if (body == NULL)
710                 RETURN(-EINVAL);
711
712         if (fid_seq(&body->fid1) == 0) {
713                 /* sync the whole device */
714                 rc = req_capsule_pack(pill);
715                 if (rc == 0)
716                         rc = mdt_device_sync(info);
717         } else {
718                 /* sync an object */
719                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS | HABEO_REFERO);
720                 if (rc == 0) {
721                         rc = mdt_object_sync(info);
722                         if (rc == 0) {
723                                 struct md_object    *next;
724                                 const struct lu_fid *fid;
725                                 next = mdt_object_child(info->mti_object);
726                                 fid = mdt_object_fid(info->mti_object);
727                                 rc = mo_attr_get(info->mti_ctxt,
728                                                  next, &info->mti_attr);
729                                 if (rc == 0) {
730                                         body = req_capsule_server_get(pill,
731                                                                 &RMF_MDT_BODY);
732                                         mdt_pack_attr2body(body,
733                                                            &info->mti_attr.ma_attr,
734                                                            fid);
735                                 }
736                         }
737                 }
738         }
739         RETURN(rc);
740 }
741
742 static int mdt_handle_quotacheck(struct mdt_thread_info *info)
743 {
744         return -EOPNOTSUPP;
745 }
746
747 static int mdt_handle_quotactl(struct mdt_thread_info *info)
748 {
749         return -EOPNOTSUPP;
750 }
751
752 /*
753  * OBD PING and other handlers.
754  */
755
756 static int mdt_obd_ping(struct mdt_thread_info *info)
757 {
758         int result;
759         ENTRY;
760         result = target_handle_ping(mdt_info_req(info));
761         RETURN(result);
762 }
763
764 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
765 {
766         return -EOPNOTSUPP;
767 }
768
769 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
770 {
771         return -EOPNOTSUPP;
772 }
773
774
775 /*
776  * DLM handlers.
777  */
778
779 static struct ldlm_callback_suite cbs = {
780         .lcs_completion = ldlm_server_completion_ast,
781         .lcs_blocking   = ldlm_server_blocking_ast,
782         .lcs_glimpse    = NULL
783 };
784
785 static int mdt_enqueue(struct mdt_thread_info *info)
786 {
787         int result;
788         struct ptlrpc_request *req;
789
790         /*
791          * info->mti_dlm_req already contains swapped and (if necessary)
792          * converted dlm request.
793          */
794         LASSERT(info->mti_dlm_req != NULL);
795
796         req = mdt_info_req(info);
797         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
798         result = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
799                                       req, info->mti_dlm_req, &cbs);
800         return result ? : req->rq_status;
801 }
802
803 static int mdt_convert(struct mdt_thread_info *info)
804 {
805         int result;
806         struct ptlrpc_request *req;
807
808         LASSERT(info->mti_dlm_req);
809         req = mdt_info_req(info);
810         result = ldlm_handle_convert0(req, info->mti_dlm_req);
811         return result ? : req->rq_status;
812 }
813
814 static int mdt_bl_callback(struct mdt_thread_info *info)
815 {
816         CERROR("bl callbacks should not happen on MDS\n");
817         LBUG();
818         return -EOPNOTSUPP;
819 }
820
821 static int mdt_cp_callback(struct mdt_thread_info *info)
822 {
823         CERROR("cp callbacks should not happen on MDS\n");
824         LBUG();
825         return -EOPNOTSUPP;
826 }
827
828 /*
829  * Build (DLM) resource name from fid.
830  */
831 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
832                                        struct ldlm_res_id *name)
833 {
834         memset(name, 0, sizeof *name);
835         name->name[0] = fid_seq(f);
836         name->name[1] = fid_oid(f);
837         name->name[2] = fid_ver(f);
838         return name;
839 }
840
841 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
842 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
843              struct lustre_handle *lh, ldlm_mode_t mode,
844              ldlm_policy_data_t *policy,
845              struct ldlm_res_id *res_id)
846 {
847         int flags = 0; /*XXX: LDLM_FL_LOCAL_ONLY?*/
848         int rc;
849
850         LASSERT(ns != NULL);
851         LASSERT(lh != NULL);
852         LASSERT(f != NULL);
853
854         /* FIXME: is that correct to have @flags=0 here? */
855         rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, res_id),
856                               LDLM_IBITS, policy, mode, &flags,
857                               ldlm_blocking_ast, ldlm_completion_ast, NULL,
858                               NULL, NULL, 0, NULL, lh);
859         return rc == ELDLM_OK ? 0 : -EIO;
860 }
861
862 /* just call ldlm_lock_decref() if decref,
863  * else we only call ptlrpc_save_lock() to save this lock in req.
864  * when transaction committed, req will be released and lock will be released */
865 void fid_unlock(struct ptlrpc_request *req, const struct lu_fid *f,
866                 struct lustre_handle *lh, ldlm_mode_t mode, int decref)
867 {
868         {
869         /* FIXME: this is debug stuff, remove it later. */
870                 struct ldlm_lock *lock = ldlm_handle2lock(lh);
871                 if (!lock) {
872                         CERROR("invalid lock handle "LPX64, lh->cookie);
873                         LBUG();
874                 }
875                 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
876                 LDLM_LOCK_PUT(lock);
877         }
878         if (decref)
879                 ldlm_lock_decref(lh, mode);
880         else
881                 ptlrpc_save_lock(req, lh, mode);
882 }
883
884 static struct mdt_object *mdt_obj(struct lu_object *o)
885 {
886         LASSERT(lu_device_is_mdt(o->lo_dev));
887         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
888 }
889
890 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
891                                    struct mdt_device *d,
892                                    const struct lu_fid *f)
893 {
894         struct lu_object *o;
895         struct mdt_object *m;
896         ENTRY;
897
898         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
899         if (IS_ERR(o))
900                 m = (struct mdt_object *)o;
901         else
902                 m = mdt_obj(o);
903         RETURN(m);
904 }
905
906 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
907                     struct mdt_lock_handle *lh, __u64 ibits)
908 {
909         ldlm_policy_data_t *policy = &info->mti_policy;
910         struct ldlm_res_id *res_id = &info->mti_res_id;
911         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
912         int rc;
913         ENTRY;
914
915         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
916         LASSERT(lh->mlh_mode != LCK_MINMODE);
917
918         policy->l_inodebits.bits = ibits;
919
920         rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, policy, res_id);
921         RETURN(rc);
922 }
923
924 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
925                        struct mdt_lock_handle *lh, int decref)
926 {
927         struct ptlrpc_request *req = mdt_info_req(info);
928         ENTRY;
929
930         if (lustre_handle_is_used(&lh->mlh_lh)) {
931                 fid_unlock(req, mdt_object_fid(o),
932                            &lh->mlh_lh, lh->mlh_mode, decref);
933                 lh->mlh_lh.cookie = 0;
934         }
935         EXIT;
936 }
937
938 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
939                                         const struct lu_fid *f,
940                                         struct mdt_lock_handle *lh,
941                                         __u64 ibits)
942 {
943         struct mdt_object *o;
944
945         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
946         if (!IS_ERR(o)) {
947                 int result;
948
949                 result = mdt_object_lock(info, o, lh, ibits);
950                 if (result != 0) {
951                         mdt_object_put(info->mti_ctxt, o);
952                         o = ERR_PTR(result);
953                 }
954         }
955         return o;
956 }
957
958 void mdt_object_unlock_put(struct mdt_thread_info * info,
959                            struct mdt_object * o,
960                            struct mdt_lock_handle *lh,
961                            int decref)
962 {
963         mdt_object_unlock(info, o, lh, decref);
964         mdt_object_put(info->mti_ctxt, o);
965 }
966
967 static struct mdt_handler *mdt_handler_find(__u32 opc,
968                                             struct mdt_opc_slice *supported)
969 {
970         struct mdt_opc_slice *s;
971         struct mdt_handler   *h;
972
973         h = NULL;
974         for (s = supported; s->mos_hs != NULL; s++) {
975                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
976                         h = s->mos_hs + (opc - s->mos_opc_start);
977                         if (h->mh_opc != 0)
978                                 LASSERT(h->mh_opc == opc);
979                         else
980                                 h = NULL; /* unsupported opc */
981                         break;
982                 }
983         }
984         return h;
985 }
986
987 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
988 {
989         return req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid;
990 }
991
992 static int mdt_lock_resname_compat(struct mdt_device *m,
993                                    struct ldlm_request *req)
994 {
995         /* XXX something... later. */
996         return 0;
997 }
998
999 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1000 {
1001         /* XXX something... later. */
1002         return 0;
1003 }
1004
1005 /*
1006  * Generic code handling requests that have struct mdt_body passed in:
1007  *
1008  *  - extract mdt_body from request and save it in @info, if present;
1009  *
1010  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
1011  *  @info;
1012  *
1013  *  - if HABEO_CORPUS flag is set for this request type check whether object
1014  *  actually exists on storage (lu_object_exists()).
1015  *
1016  */
1017 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1018 {
1019         const struct mdt_body   *body;
1020         struct mdt_object       *obj;
1021         const struct lu_context *ctx;
1022         struct req_capsule      *pill;
1023         int                     result;
1024
1025         ctx = info->mti_ctxt;
1026         pill = &info->mti_pill;
1027
1028         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1029         if (body != NULL) {
1030                 if (fid_is_sane(&body->fid1)) {
1031                         obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
1032                         if (!IS_ERR(obj)) {
1033                                 if ((flags & HABEO_CORPUS) &&
1034                                     !lu_object_exists(ctx,
1035                                                       &obj->mot_obj.mo_lu)) {
1036                                         mdt_object_put(ctx, obj);
1037                                         result = -ENOENT;
1038                                 } else {
1039                                         info->mti_object = obj;
1040                                         result = 0;
1041                                 }
1042                         } else
1043                                 result = PTR_ERR(obj);
1044                 } else {
1045                         CERROR("Invalid fid: "DFID3"\n", PFID3(&body->fid1));
1046                         result = -EINVAL;
1047                 }
1048         } else
1049                 result = -EFAULT;
1050         return result;
1051 }
1052
1053 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1054 {
1055         struct req_capsule *pill;
1056         int result;
1057
1058         ENTRY;
1059         pill = &info->mti_pill;
1060
1061         if (req_capsule_has_field(pill, &RMF_MDT_BODY))
1062                 result = mdt_body_unpack(info, flags);
1063         else
1064                 result = 0;
1065
1066         if (result == 0 && (flags & HABEO_REFERO))
1067                 result = req_capsule_pack(pill);
1068
1069         RETURN(result);
1070 }
1071
1072 /* FIXME: fake untill journal callback is OK.*/
1073 struct lu_context_key mdt_txn_key;
1074
1075 int mdt_update_last_transno(struct mdt_thread_info *info, int rc)
1076 {
1077         struct mdt_device     *mdt = info->mti_mdt;
1078         struct ptlrpc_request *req = mdt_info_req(info);
1079         struct obd_export     *exp = req->rq_export;
1080         __u64 last_transno;
1081         __u64 last_committed;
1082
1083         if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1084                 return -EFAULT;
1085         if (info->mti_trans_flags & MDT_NONEED_TANSNO)
1086                 return 0;
1087         
1088         last_committed = mdt->mdt_last_committed;
1089
1090         if (rc == 0) {
1091                 last_transno = info->mti_transno;
1092         } else {
1093                 if (info->mti_transno != 0)
1094                         CERROR("replay %s transno "LPU64" failed: rc %d\n",
1095                                libcfs_nid2str(exp->exp_connection->c_peer.nid),
1096                                info->mti_transno, rc);
1097                 last_transno = 0;
1098         }
1099         CDEBUG(D_INODE, "last_transno = %llu, last_committed = %llu\n",
1100                last_transno, last_committed);
1101
1102         req->rq_repmsg->transno = req->rq_transno = last_transno;
1103         req->rq_repmsg->last_xid = req->rq_xid;
1104         req->rq_repmsg->last_committed = last_committed;
1105         exp->exp_obd->obd_last_committed = last_committed;
1106         return 0;
1107 }
1108
1109 /*
1110  * Invoke handler for this request opc. Also do necessary preprocessing
1111  * (according to handler ->mh_flags), and post-processing (setting of
1112  * ->last_{xid,committed}).
1113  */
1114 static int mdt_req_handle(struct mdt_thread_info *info,
1115                           struct mdt_handler *h, struct ptlrpc_request *req)
1116 {
1117         int   result;
1118         __u32 flags;
1119
1120         ENTRY;
1121
1122         LASSERT(h->mh_act != NULL);
1123         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
1124         LASSERT(current->journal_info == NULL);
1125
1126         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1127
1128         if (h->mh_fail_id != 0)
1129                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
1130
1131         result = 0;
1132         flags = h->mh_flags;
1133         LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->mh_fmt != NULL));
1134
1135         if (h->mh_fmt != NULL) {
1136                 req_capsule_set(&info->mti_pill, h->mh_fmt);
1137                 result = mdt_unpack_req_pack_rep(info, flags);
1138         }
1139
1140         if (result == 0 && flags & MUTABOR &&
1141             req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1142                 result = -EROFS;
1143
1144         if (result == 0 && flags & HABEO_CLAVIS) {
1145                 struct ldlm_request *dlm_req;
1146
1147                 LASSERT(h->mh_fmt != NULL);
1148
1149                 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1150                 if (dlm_req != NULL) {
1151                         if (info->mti_mdt->mdt_opts.mo_compat_resname)
1152                                 result = mdt_lock_resname_compat(info->mti_mdt,
1153                                                                  dlm_req);
1154                         info->mti_dlm_req = dlm_req;
1155                 } else {
1156                         CERROR("Can't unpack dlm request\n");
1157                         result = -EFAULT;
1158                 }
1159         }
1160
1161         if (result == 0)
1162                 /*
1163                  * Process request.
1164                  */
1165                 result = h->mh_act(info);
1166         /*
1167          * XXX result value is unconditionally shoved into ->rq_status
1168          * (original code sometimes placed error code into ->rq_status, and
1169          * sometimes returned it to the
1170          * caller). ptlrpc_server_handle_request() doesn't check return value
1171          * anyway.
1172          */
1173         req->rq_status = result;
1174         result = 0;
1175         LASSERT(current->journal_info == NULL);
1176
1177         if (flags & HABEO_CLAVIS && info->mti_mdt->mdt_opts.mo_compat_resname) {
1178                 struct ldlm_reply *dlm_rep;
1179
1180                 dlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1181                 if (dlm_rep != NULL)
1182                         result = mdt_lock_reply_compat(info->mti_mdt, dlm_rep);
1183         }
1184
1185         /* If we're DISCONNECTing, the mdt_export_data is already freed */
1186
1187         if (h->mh_opc != MDS_DISCONNECT &&
1188             h->mh_opc != MDS_READPAGE &&
1189             h->mh_opc != LDLM_ENQUEUE) {
1190                         mdt_update_last_transno(info, result);
1191         }
1192         RETURN(result);
1193 }
1194
1195
1196 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1197 {
1198         lh->mlh_lh.cookie = 0ull;
1199         lh->mlh_mode = LCK_MINMODE;
1200 }
1201
1202 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1203 {
1204         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1205 }
1206
1207 static void mdt_thread_info_init(struct ptlrpc_request *req,
1208                                  struct mdt_thread_info *info)
1209 {
1210         int i;
1211
1212         memset(info, 0, sizeof(*info));
1213
1214         info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1215         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1216                 info->mti_rep_buf_size[i] = -1;
1217
1218         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1219                 mdt_lock_handle_init(&info->mti_lh[i]);
1220
1221         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1222         info->mti_ctxt = req->rq_svc_thread->t_ctx;
1223         info->mti_transno = req->rq_reqmsg->transno;
1224         /* it can be NULL while CONNECT */
1225         if (req->rq_export)
1226                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1227         req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1228                          info->mti_rep_buf_size);
1229 }
1230
1231 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1232 {
1233         int i;
1234
1235         req_capsule_fini(&info->mti_pill);
1236         if (info->mti_object != NULL) {
1237                 mdt_object_put(info->mti_ctxt, info->mti_object);
1238                 info->mti_object = NULL;
1239         }
1240         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1241                 mdt_lock_handle_fini(&info->mti_lh[i]);
1242 }
1243
1244 /* mds/handler.c */
1245 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1246                                        struct obd_device *obd, int *process);
1247 /*
1248  * Handle recovery. Return:
1249  *        +1: continue request processing;
1250  *       -ve: abort immediately with the given error code;
1251  *         0: send reply with error code in req->rq_status;
1252  */
1253 static int mdt_recovery(struct ptlrpc_request *req)
1254 {
1255         int recovering;
1256         int abort_recovery;
1257         struct obd_device *obd;
1258
1259         ENTRY;
1260
1261         if (req->rq_reqmsg->opc == MDS_CONNECT)
1262                 RETURN(+1);
1263
1264         if (req->rq_export == NULL) {
1265                 CERROR("operation %d on unconnected MDS from %s\n",
1266                        req->rq_reqmsg->opc,
1267                        libcfs_id2str(req->rq_peer));
1268                 req->rq_status = -ENOTCONN;
1269                 RETURN(-ENOTCONN);
1270         }
1271
1272         /* sanity check: if the xid matches, the request must be marked as a
1273          * resent or replayed */
1274         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
1275                       lustre_msg_get_flags(req->rq_reqmsg) &
1276                       (MSG_RESENT | MSG_REPLAY)),
1277                  "rq_xid "LPU64" matches last_xid, "
1278                  "expected RESENT flag\n", req->rq_xid);
1279
1280         /* else: note the opposite is not always true; a RESENT req after a
1281          * failover will usually not match the last_xid, since it was likely
1282          * never committed. A REPLAYed request will almost never match the
1283          * last xid, however it could for a committed, but still retained,
1284          * open. */
1285
1286         obd = req->rq_export->exp_obd;
1287
1288         /* Check for aborted recovery... */
1289         spin_lock_bh(&obd->obd_processing_task_lock);
1290         abort_recovery = obd->obd_abort_recovery;
1291         recovering = obd->obd_recovering;
1292         spin_unlock_bh(&obd->obd_processing_task_lock);
1293         if (abort_recovery) {
1294                 target_abort_recovery(obd);
1295         } else if (recovering) {
1296                 int rc;
1297                 int should_process;
1298
1299                 rc = mds_filter_recovery_request(req, obd, &should_process);
1300                 if (rc != 0 || !should_process) {
1301                         LASSERT(rc < 0);
1302                         RETURN(rc);
1303                 }
1304         }
1305         RETURN(+1);
1306 }
1307
1308 static int mdt_reply(struct ptlrpc_request *req, int result,
1309                      struct mdt_thread_info *info)
1310 {
1311         struct obd_device *obd;
1312         ENTRY;
1313
1314         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1315                 if (req->rq_reqmsg->opc != OBD_PING)
1316                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1317
1318                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1319                 if (obd && obd->obd_recovering) {
1320                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1321                         RETURN(target_queue_final_reply(req, result));
1322                 } else {
1323                         /* Lost a race with recovery; let the error path
1324                          * DTRT. */
1325                         result = req->rq_status = -ENOTCONN;
1326                 }
1327         }
1328         target_send_reply(req, result, info->mti_fail_id);
1329         RETURN(0);
1330 }
1331
1332 /* mds/handler.c */
1333 extern int mds_msg_check_version(struct lustre_msg *msg);
1334
1335 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info,
1336                        struct mdt_opc_slice *supported)
1337 {
1338         struct mdt_handler *h;
1339         struct lustre_msg  *msg;
1340         int                 result;
1341
1342         ENTRY;
1343
1344         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1345
1346         LASSERT(current->journal_info == NULL);
1347
1348         msg = req->rq_reqmsg;
1349         result = mds_msg_check_version(msg);
1350         if (result == 0) {
1351                 result = mdt_recovery(req);
1352                 switch (result) {
1353                 case +1:
1354                         h = mdt_handler_find(msg->opc, supported);
1355                         if (h != NULL)
1356                                 result = mdt_req_handle(info, h, req);
1357                         else {
1358                                 req->rq_status = -ENOTSUPP;
1359                                 result = ptlrpc_error(req);
1360                                 break;
1361                         }
1362                         /* fall through */
1363                 case 0:
1364                         result = mdt_reply(req, result, info);
1365                 }
1366         } else
1367                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
1368         RETURN(result);
1369 }
1370
1371 /*
1372  * MDT handler function called by ptlrpc service thread when request comes.
1373  *
1374  * XXX common "target" functionality should be factored into separate module
1375  * shared by mdt, ost and stand-alone services like fld.
1376  */
1377 static int mdt_handle_common(struct ptlrpc_request *req,
1378                              struct mdt_opc_slice *supported)
1379 {
1380         struct lu_context      *ctx;
1381         struct mdt_thread_info *info;
1382         int                     result;
1383         ENTRY;
1384
1385         ctx = req->rq_svc_thread->t_ctx;
1386         LASSERT(ctx != NULL);
1387         LASSERT(ctx->lc_thread == req->rq_svc_thread);
1388         info = lu_context_key_get(ctx, &mdt_thread_key);
1389         LASSERT(info != NULL);
1390
1391         mdt_thread_info_init(req, info);
1392
1393         result = mdt_handle0(req, info, supported);
1394
1395         mdt_thread_info_fini(info);
1396         RETURN(result);
1397 }
1398
1399 static int mdt_handle(struct ptlrpc_request *req)
1400 {
1401         return mdt_handle_common(req, mdt_handlers);
1402 }
1403
1404 static int mdt_readpage_handle(struct ptlrpc_request *req)
1405 {
1406         return mdt_handle_common(req, mdt_readpage_handlers);
1407 }
1408
1409 /*Please move these functions from mds to mdt*/
1410 int intent_disposition(struct ldlm_reply *rep, int flag)
1411 {
1412         if (!rep)
1413                 return 0;
1414         return (rep->lock_policy_res1 & flag);
1415 }
1416
1417 void intent_set_disposition(struct ldlm_reply *rep, int flag)
1418 {
1419         if (!rep)
1420                 return;
1421         rep->lock_policy_res1 |= flag;
1422 }
1423
1424 enum mdt_it_code {
1425         MDT_IT_OPEN,
1426         MDT_IT_OCREAT,
1427         MDT_IT_CREATE,
1428         MDT_IT_GETATTR,
1429         MDT_IT_READDIR,
1430         MDT_IT_LOOKUP,
1431         MDT_IT_UNLINK,
1432         MDT_IT_TRUNC,
1433         MDT_IT_GETXATTR,
1434         MDT_IT_NR
1435 };
1436
1437 static int mdt_intent_getattr(enum mdt_it_code opcode,
1438                               struct mdt_thread_info *info,
1439                               struct ldlm_lock **,
1440                               int);
1441 static int mdt_intent_reint(enum mdt_it_code opcode,
1442                             struct mdt_thread_info *info,
1443                             struct ldlm_lock **,
1444                             int);
1445
1446 static struct mdt_it_flavor {
1447         const struct req_format *it_fmt;
1448         __u32                    it_flags;
1449         int                    (*it_act)(enum mdt_it_code ,
1450                                          struct mdt_thread_info *,
1451                                          struct ldlm_lock **,
1452                                          int);
1453         long                     it_reint;
1454 } mdt_it_flavor[] = {
1455         [MDT_IT_OPEN]     = {
1456                 .it_fmt   = &RQF_LDLM_INTENT,
1457                 /*.it_flags = HABEO_REFERO,*/
1458                 .it_flags = 0,
1459                 .it_act   = mdt_intent_reint,
1460                 .it_reint = REINT_OPEN
1461         },
1462         [MDT_IT_OCREAT]   = {
1463                 .it_fmt   = &RQF_LDLM_INTENT,
1464                 .it_flags = MUTABOR,
1465                 .it_act   = mdt_intent_reint,
1466                 .it_reint = REINT_OPEN
1467         },
1468         [MDT_IT_CREATE]   = {
1469                 .it_fmt   = &RQF_LDLM_INTENT,
1470                 .it_flags = MUTABOR,
1471                 .it_act   = mdt_intent_reint,
1472                 .it_reint = REINT_CREATE
1473         },
1474         [MDT_IT_GETATTR]  = {
1475                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1476                 .it_flags = 0,
1477                 .it_act   = mdt_intent_getattr
1478         },
1479         [MDT_IT_READDIR]  = {
1480                 .it_fmt   = NULL,
1481                 .it_flags = 0,
1482                 .it_act   = NULL
1483         },
1484         [MDT_IT_LOOKUP]   = {
1485                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1486                 .it_flags = 0,
1487                 .it_act   = mdt_intent_getattr
1488         },
1489         [MDT_IT_UNLINK]   = {
1490                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
1491                 .it_flags = MUTABOR,
1492                 .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
1493                 .it_reint = REINT_UNLINK
1494         },
1495         [MDT_IT_TRUNC]    = {
1496                 .it_fmt   = NULL,
1497                 .it_flags = MUTABOR,
1498                 .it_act   = NULL
1499         },
1500         [MDT_IT_GETXATTR] = {
1501                 .it_fmt   = NULL,
1502                 .it_flags = 0,
1503                 .it_act   = NULL
1504         }
1505 };
1506
1507 static int mdt_intent_getattr(enum mdt_it_code opcode,
1508                               struct mdt_thread_info *info,
1509                               struct ldlm_lock **lockp,
1510                               int flags)
1511 {
1512         struct ldlm_lock       *old_lock = *lockp;
1513         struct ldlm_lock       *new_lock = NULL;
1514         struct ptlrpc_request  *req = mdt_info_req(info);
1515         struct ldlm_reply      *ldlm_rep;
1516         struct mdt_lock_handle  tmp_lock;
1517         struct mdt_lock_handle *lhc = &tmp_lock;
1518         struct mdt_device      *mdt = info->mti_mdt;
1519         __u64                   child_bits;
1520         int                     rc;
1521
1522         ENTRY;
1523
1524         switch (opcode) {
1525         case MDT_IT_LOOKUP:
1526                 child_bits = MDS_INODELOCK_LOOKUP;
1527                 break;
1528         case MDT_IT_GETATTR:
1529                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1530                 break;
1531         default:
1532                 CERROR("Unhandled till now");
1533                 RETURN(-EINVAL);
1534         }
1535
1536         req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
1537                              RCL_SERVER, mdt->mdt_max_mdsize);
1538
1539         rc = req_capsule_pack(&info->mti_pill);
1540         if (rc)
1541                 RETURN(rc);
1542         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1543         intent_set_disposition(ldlm_rep, DISP_IT_EXECD);
1544
1545         ldlm_rep->lock_policy_res2 =
1546                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
1547
1548         if (intent_disposition(ldlm_rep, DISP_LOOKUP_NEG))
1549                 ldlm_rep->lock_policy_res2 = 0;
1550         if (!intent_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
1551                     ldlm_rep->lock_policy_res2) {
1552                 RETURN(ELDLM_LOCK_ABORTED);
1553         }
1554
1555         new_lock = ldlm_handle2lock(&lhc->mlh_lh);
1556         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1557                 RETURN(0);
1558
1559         LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
1560                  opcode, lhc->mlh_lh.cookie);
1561
1562         *lockp = new_lock;
1563
1564         /* FIXME:This only happens when MDT can handle RESENT */
1565         if (new_lock->l_export == req->rq_export) {
1566                 /* Already gave this to the client, which means that we
1567                  * reconstructed a reply. */
1568                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1569                         MSG_RESENT);
1570                 RETURN(ELDLM_LOCK_REPLACED);
1571         }
1572
1573         /* TODO:
1574          * These are copied from mds/hander.c, and should be factored into
1575          * ldlm module in order to share these code, and be easy for merge.
1576          */
1577
1578         /* Fixup the lock to be given to the client */
1579         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1580         new_lock->l_readers = 0;
1581         new_lock->l_writers = 0;
1582
1583         new_lock->l_export = class_export_get(req->rq_export);
1584         list_add(&new_lock->l_export_chain,
1585                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
1586
1587         new_lock->l_blocking_ast = old_lock->l_blocking_ast;
1588         new_lock->l_completion_ast = old_lock->l_completion_ast;
1589
1590         new_lock->l_remote_handle = old_lock->l_remote_handle;
1591
1592         new_lock->l_flags &= ~LDLM_FL_LOCAL;
1593
1594         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1595         LDLM_LOCK_PUT(new_lock);
1596
1597         RETURN(ELDLM_LOCK_REPLACED);
1598 }
1599
1600 static int mdt_intent_reint(enum mdt_it_code opcode,
1601                             struct mdt_thread_info *info,
1602                             struct ldlm_lock **lockp,
1603                             int flags)
1604 {
1605         long opc;
1606         int rc;
1607         struct ldlm_reply *rep;
1608
1609         static const struct req_format *intent_fmts[REINT_MAX] = {
1610                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
1611                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
1612         };
1613
1614         ENTRY;
1615
1616         opc = mdt_reint_opcode(info, intent_fmts);
1617         if (opc < 0)
1618                 RETURN(opc);
1619
1620         if (mdt_it_flavor[opcode].it_reint != opc) {
1621                 CERROR("Reint code %ld doesn't match intent: %d\n",
1622                        opc, opcode);
1623                 RETURN(-EPROTO);
1624         }
1625
1626         rc = mdt_reint_internal(info, opc);
1627
1628         rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1629         if (rep == NULL)
1630                 RETURN(-EFAULT);
1631         rep->lock_policy_res2 = rc;
1632
1633         intent_set_disposition(rep, DISP_IT_EXECD);
1634
1635         mdt_update_last_transno(info, rep->lock_policy_res2);
1636
1637         RETURN(ELDLM_LOCK_ABORTED);
1638 }
1639
1640 static int mdt_intent_code(long itcode)
1641 {
1642         int result;
1643
1644         switch(itcode) {
1645         case IT_OPEN:
1646                 result = MDT_IT_OPEN;
1647                 break;
1648         case IT_OPEN|IT_CREAT:
1649                 result = MDT_IT_OCREAT;
1650                 break;
1651         case IT_CREAT:
1652                 result = MDT_IT_CREATE;
1653                 break;
1654         case IT_READDIR:
1655                 result = MDT_IT_READDIR;
1656                 break;
1657         case IT_GETATTR:
1658                 result = MDT_IT_GETATTR;
1659                 break;
1660         case IT_LOOKUP:
1661                 result = MDT_IT_LOOKUP;
1662                 break;
1663         case IT_UNLINK:
1664                 result = MDT_IT_UNLINK;
1665                 break;
1666         case IT_TRUNC:
1667                 result = MDT_IT_TRUNC;
1668                 break;
1669         case IT_GETXATTR:
1670                 result = MDT_IT_GETXATTR;
1671                 break;
1672         default:
1673                 CERROR("Unknown intent opcode: %ld\n", itcode);
1674                 result = -EINVAL;
1675                 break;
1676         }
1677         return result;
1678 }
1679
1680 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
1681                           struct ldlm_lock **lockp, int flags)
1682 {
1683         struct req_capsule   *pill;
1684         struct mdt_it_flavor *flv;
1685         int opc;
1686         int rc;
1687         ENTRY;
1688
1689         opc = mdt_intent_code(itopc);
1690         if (opc < 0)
1691                 RETURN(-EINVAL);
1692
1693         pill = &info->mti_pill;
1694         flv  = &mdt_it_flavor[opc];
1695
1696         if (flv->it_fmt != NULL)
1697                 req_capsule_extend(pill, flv->it_fmt);
1698
1699         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
1700         if (rc == 0) {
1701                 struct ptlrpc_request *req = mdt_info_req(info);
1702                 if (flv->it_flags & MUTABOR &&
1703                     req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1704                         rc = -EROFS;
1705         }
1706         if (rc == 0 && flv->it_act != NULL) {
1707                 /* execute policy */
1708                 rc = flv->it_act(opc, info, lockp, flags);
1709         } else
1710                 rc = -EOPNOTSUPP;
1711         RETURN(rc);
1712 }
1713
1714 static int mdt_intent_policy(struct ldlm_namespace *ns,
1715                              struct ldlm_lock **lockp, void *req_cookie,
1716                              ldlm_mode_t mode, int flags, void *data)
1717 {
1718         struct mdt_thread_info *info;
1719         struct ptlrpc_request  *req  =  req_cookie;
1720         struct ldlm_intent     *it;
1721         struct req_capsule     *pill;
1722         struct ldlm_lock       *lock = *lockp;
1723         int rc;
1724
1725         ENTRY;
1726
1727         LASSERT(req != NULL);
1728
1729         info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1730         LASSERT(info != NULL);
1731         pill = &info->mti_pill;
1732         LASSERT(pill->rc_req == req);
1733
1734         if (req->rq_reqmsg->bufcount > MDS_REQ_INTENT_IT_OFF) {
1735                 req_capsule_extend(pill, &RQF_LDLM_INTENT);
1736                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
1737                 if (it != NULL) {
1738                         LDLM_DEBUG(lock, "intent policy opc: %s",
1739                                    ldlm_it2str(it->opc));
1740
1741                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
1742                         if (rc == 0)
1743                                 rc = ELDLM_OK;
1744                 } else
1745                         rc = -EFAULT;
1746         } else {
1747                 /* No intent was provided */
1748                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
1749                 rc = req_capsule_pack(pill);
1750         }
1751         RETURN(rc);
1752 }
1753
1754 /*
1755  * Seq wrappers
1756  */
1757 static int mdt_seq_fini(const struct lu_context *ctx,
1758                         struct mdt_device *m)
1759 {
1760         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1761         ENTRY;
1762
1763         if (ls && ls->ls_server_seq) {
1764                 seq_server_fini(ls->ls_server_seq, ctx);
1765                 OBD_FREE_PTR(ls->ls_server_seq);
1766                 ls->ls_server_seq = NULL;
1767         }
1768         if (ls && ls->ls_control_seq) {
1769                 seq_server_fini(ls->ls_control_seq, ctx);
1770                 OBD_FREE_PTR(ls->ls_control_seq);
1771                 ls->ls_control_seq = NULL;
1772         }
1773         RETURN(0);
1774 }
1775
1776 static int mdt_seq_init(const struct lu_context *ctx,
1777                         const char *uuid,
1778                         struct mdt_device *m)
1779 {
1780         struct lu_site *ls;
1781         int rc;
1782         ENTRY;
1783
1784         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1785
1786         /* sequence-controller node */
1787         if (ls->ls_node_id == 0) {
1788                 LASSERT(ls->ls_control_seq == NULL);
1789                 OBD_ALLOC_PTR(ls->ls_control_seq);
1790
1791                 if (ls->ls_control_seq != NULL) {
1792                         rc = seq_server_init(ls->ls_control_seq,
1793                                              m->mdt_bottom, uuid,
1794                                              LUSTRE_SEQ_CTLR,
1795                                              ctx);
1796                 } else
1797                         rc = -ENOMEM;
1798         }
1799
1800         LASSERT(ls->ls_server_seq == NULL);
1801         OBD_ALLOC_PTR(ls->ls_server_seq);
1802
1803         if (ls->ls_server_seq != NULL) {
1804                 rc = seq_server_init(ls->ls_server_seq,
1805                                      m->mdt_bottom, uuid,
1806                                      LUSTRE_SEQ_SRV,
1807                                      ctx);
1808         } else
1809                 rc = -ENOMEM;
1810
1811         if (rc)
1812                 mdt_seq_fini(ctx, m);
1813
1814         RETURN(rc);
1815 }
1816
1817 /*
1818  * Init client sequence manager which is used by local MDS to talk to sequence
1819  * controller on remote node.
1820  */
1821 static int mdt_seq_init_cli(const struct lu_context *ctx,
1822                             struct mdt_device *m,
1823                             struct lustre_cfg *cfg)
1824 {
1825         struct lu_site    *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1826         struct obd_device *mdc;
1827         struct obd_uuid   *uuidp;
1828         char              *uuid_str;
1829         int               rc;
1830         int               index;
1831         struct mdt_thread_info *info;
1832         char *p, *index_string = lustre_cfg_string(cfg, 2);
1833         ENTRY;
1834
1835         info = lu_context_key_get(ctx, &mdt_thread_key);
1836         uuidp = &info->mti_u.uuid;
1837
1838         LASSERT(index_string);
1839
1840         index = simple_strtol(index_string, &p, 10);
1841         if (*p) {
1842                 CERROR("Invalid index in lustre_cgf, offset 2\n");
1843                 RETURN(-EINVAL);
1844         }
1845
1846         /* check if this is first MDC add and controller is not yet
1847          * initialized. */
1848         if (index != 0 || ls->ls_client_exp)
1849                 RETURN(0);
1850
1851         uuid_str = lustre_cfg_string(cfg, 1);
1852         obd_str2uuid(uuidp, uuid_str);
1853         mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, NULL);
1854         if (!mdc) {
1855                 CERROR("can't find controller MDC by uuid %s\n",
1856                        uuid_str);
1857                 rc = -ENOENT;
1858         } else if (!mdc->obd_set_up) {
1859                 CERROR("target %s not set up\n", mdc->obd_name);
1860                 rc = -EINVAL;
1861         } else {
1862                 struct lustre_handle conn = {0, };
1863
1864                 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
1865                        mdc->obd_name, mdc->obd_uuid.uuid);
1866
1867                 rc = obd_connect(ctx, &conn, mdc, &mdc->obd_uuid, NULL);
1868
1869                 if (rc) {
1870                         CERROR("target %s connect error %d\n",
1871                                mdc->obd_name, rc);
1872                 } else {
1873                         ls->ls_client_exp = class_conn2export(&conn);
1874
1875                         OBD_ALLOC_PTR(ls->ls_client_seq);
1876
1877                         if (ls->ls_client_seq != NULL) {
1878                                 rc = seq_client_init(ls->ls_client_seq,
1879                                                      mdc->obd_name,
1880                                                      ls->ls_client_exp);
1881                         } else
1882                                 rc = -ENOMEM;
1883
1884                         if (rc)
1885                                 RETURN(rc);
1886
1887                         LASSERT(ls->ls_server_seq != NULL);
1888
1889                         rc = seq_server_set_cli(ls->ls_server_seq,
1890                                                 ls->ls_client_seq,
1891                                                 ctx);
1892                 }
1893         }
1894
1895         RETURN(rc);
1896 }
1897
1898 static void mdt_seq_fini_cli(struct mdt_device *m)
1899 {
1900         struct lu_site *ls;
1901
1902         ENTRY;
1903
1904         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1905
1906         if (ls && ls->ls_server_seq)
1907                 seq_server_set_cli(ls->ls_server_seq,
1908                                    NULL, NULL);
1909
1910         if (ls && ls->ls_client_seq) {
1911                 seq_client_fini(ls->ls_client_seq);
1912                 OBD_FREE_PTR(ls->ls_client_seq);
1913                 ls->ls_client_seq = NULL;
1914         }
1915
1916         if (ls && ls->ls_client_exp) {
1917                 int rc = obd_disconnect(ls->ls_client_exp);
1918                 ls->ls_client_exp = NULL;
1919
1920                 if (rc) {
1921                         CERROR("failure to disconnect "
1922                                "obd: %d\n", rc);
1923                 }
1924         }
1925         EXIT;
1926 }
1927
1928 /*
1929  * FLD wrappers
1930  */
1931 static int mdt_fld_init(const struct lu_context *ctx,
1932                         const char *uuid,
1933                         struct mdt_device *m)
1934 {
1935         struct lu_site *ls;
1936         int rc;
1937         ENTRY;
1938
1939         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1940
1941         OBD_ALLOC_PTR(ls->ls_server_fld);
1942
1943         if (ls->ls_server_fld != NULL) {
1944                 rc = fld_server_init(ls->ls_server_fld, ctx,
1945                                      m->mdt_bottom, uuid);
1946                 if (rc) {
1947                         OBD_FREE_PTR(ls->ls_server_fld);
1948                         ls->ls_server_fld = NULL;
1949                 }
1950         } else
1951                 rc = -ENOMEM;
1952
1953         RETURN(rc);
1954 }
1955
1956 static int mdt_fld_fini(const struct lu_context *ctx,
1957                         struct mdt_device *m)
1958 {
1959         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1960         ENTRY;
1961
1962         if (ls && ls->ls_server_fld) {
1963                 fld_server_fini(ls->ls_server_fld, ctx);
1964                 OBD_FREE_PTR(ls->ls_server_fld);
1965                 ls->ls_server_fld = NULL;
1966         }
1967         RETURN(0);
1968 }
1969
1970 /* device init/fini methods */
1971 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1972 {
1973         if (m->mdt_service != NULL) {
1974                 ptlrpc_unregister_service(m->mdt_service);
1975                 m->mdt_service = NULL;
1976         }
1977         if (m->mdt_readpage_service != NULL) {
1978                 ptlrpc_unregister_service(m->mdt_readpage_service);
1979                 m->mdt_readpage_service = NULL;
1980         }
1981         if (m->mdt_setattr_service != NULL) {
1982                 ptlrpc_unregister_service(m->mdt_setattr_service);
1983                 m->mdt_setattr_service = NULL;
1984         }
1985 }
1986
1987 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1988 {
1989         int rc;
1990         struct ptlrpc_service_conf conf = {
1991                 .psc_nbufs            = MDS_NBUFS,
1992                 .psc_bufsize          = MDS_BUFSIZE,
1993                 .psc_max_req_size     = MDS_MAXREQSIZE,
1994                 .psc_max_reply_size   = MDS_MAXREPSIZE,
1995                 .psc_req_portal       = MDS_REQUEST_PORTAL,
1996                 .psc_rep_portal       = MDC_REPLY_PORTAL,
1997                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
1998                 /*
1999                  * We'd like to have a mechanism to set this on a per-device
2000                  * basis, but alas...
2001                  */
2002                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2003                                        MDT_MAX_THREADS),
2004                 .psc_ctx_tags      = LCT_MD_THREAD
2005         };
2006
2007         ENTRY;
2008
2009         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2010         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2011                            "mdt_ldlm_client", m->mdt_ldlm_client);
2012
2013         m->mdt_service =
2014                 ptlrpc_init_svc_conf(&conf, mdt_handle, LUSTRE_MDT0_NAME,
2015                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2016                                      NULL);
2017         if (m->mdt_service == NULL)
2018                 RETURN(-ENOMEM);
2019
2020         rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
2021         if (rc)
2022                 GOTO(err_mdt_svc, rc);
2023
2024         /*
2025          * readpage service configuration. Parameters have to be adjusted,
2026          * ideally.
2027          */
2028         conf = (typeof(conf)) {
2029                 .psc_nbufs            = MDS_NBUFS,
2030                 .psc_bufsize          = MDS_BUFSIZE,
2031                 .psc_max_req_size     = MDS_MAXREQSIZE,
2032                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2033                 .psc_req_portal       = MDS_READPAGE_PORTAL,
2034                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2035                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2036                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2037                                        MDT_MAX_THREADS),
2038                 .psc_ctx_tags      = LCT_MD_THREAD
2039         };
2040         m->mdt_readpage_service =
2041                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2042                                      LUSTRE_MDT0_NAME "_readpage",
2043                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2044                                      NULL);
2045
2046         if (m->mdt_readpage_service == NULL) {
2047                 CERROR("failed to start readpage service\n");
2048                 GOTO(err_mdt_svc, rc = -ENOMEM);
2049         }
2050
2051         rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2052
2053         /*
2054          * setattr service configuration.
2055          */
2056         conf = (typeof(conf)) {
2057                 .psc_nbufs            = MDS_NBUFS,
2058                 .psc_bufsize          = MDS_BUFSIZE,
2059                 .psc_max_req_size     = MDS_MAXREQSIZE,
2060                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2061                 .psc_req_portal       = MDS_SETATTR_PORTAL,
2062                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2063                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2064                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2065                                        MDT_MAX_THREADS),
2066                 .psc_ctx_tags      = LCT_MD_THREAD
2067         };
2068
2069         m->mdt_setattr_service =
2070                 ptlrpc_init_svc_conf(&conf, mdt_handle,
2071                                      LUSTRE_MDT0_NAME "_setattr",
2072                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2073                                      NULL);
2074
2075         if (!m->mdt_setattr_service) {
2076                 CERROR("failed to start setattr service\n");
2077                 GOTO(err_mdt_svc, rc = -ENOMEM);
2078         }
2079
2080         rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2081         if (rc)
2082                 GOTO(err_mdt_svc, rc);
2083
2084         EXIT;
2085 err_mdt_svc:
2086         if (rc)
2087                 mdt_stop_ptlrpc_service(m);
2088
2089         return rc;
2090 }
2091
2092 static void mdt_stack_fini(const struct lu_context *ctx,
2093                            struct mdt_device *m, struct lu_device *d)
2094 {
2095         /* goes through all stack */
2096         while (d != NULL) {
2097                 struct lu_device *n;
2098                 struct obd_type *type;
2099                 struct lu_device_type *ldt = d->ld_type;
2100
2101                 lu_device_put(d);
2102
2103                 /* each fini() returns next device in stack of layers
2104                  * * so we can avoid the recursion */
2105                 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
2106                 ldt->ldt_ops->ldto_device_free(ctx, d);
2107
2108                 type = ldt->ldt_obd_type;
2109                 type->typ_refcnt--;
2110                 class_put_type(type);
2111                 /* switch to the next device in the layer */
2112                 d = n;
2113         }
2114         m->mdt_child = NULL;
2115 }
2116
2117 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
2118                                          const char *typename,
2119                                          struct lu_device *child,
2120                                          struct lustre_cfg *cfg)
2121 {
2122         struct obd_type       *type;
2123         struct lu_device_type *ldt;
2124         struct lu_device      *d;
2125         int rc;
2126
2127         /* find the type */
2128         type = class_get_type(typename);
2129         if (!type) {
2130                 CERROR("Unknown type: '%s'\n", typename);
2131                 GOTO(out, rc = -ENODEV);
2132         }
2133
2134         ldt = type->typ_lu;
2135         if (ldt == NULL) {
2136                 CERROR("type: '%s'\n", typename);
2137                 GOTO(out_type, rc = -EINVAL);
2138         }
2139
2140         ldt->ldt_obd_type = type;
2141         d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
2142         if (IS_ERR(d)) {
2143                 CERROR("Cannot allocate device: '%s'\n", typename);
2144                 GOTO(out_type, rc = -ENODEV);
2145         }
2146
2147         LASSERT(child->ld_site);
2148         d->ld_site = child->ld_site;
2149
2150         type->typ_refcnt++;
2151         rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
2152         if (rc) {
2153                 CERROR("can't init device '%s', rc %d\n", typename, rc);
2154                 GOTO(out_alloc, rc);
2155         }
2156         lu_device_get(d);
2157
2158         RETURN(d);
2159 out_alloc:
2160         ldt->ldt_ops->ldto_device_free(ctx, d);
2161         type->typ_refcnt--;
2162 out_type:
2163         class_put_type(type);
2164 out:
2165         return ERR_PTR(rc);
2166 }
2167
2168 static int mdt_stack_init(const struct lu_context *ctx,
2169                           struct mdt_device *m, struct lustre_cfg *cfg)
2170 {
2171         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
2172         struct lu_device  *tmp;
2173         struct md_device *md;
2174         int rc;
2175         ENTRY;
2176
2177         /* init the stack */
2178         tmp = mdt_layer_setup(ctx, LUSTRE_OSD0_NAME, d, cfg);
2179         if (IS_ERR(tmp)) {
2180                 RETURN(PTR_ERR(tmp));
2181         }
2182         m->mdt_bottom = lu2dt_dev(tmp);
2183         d = tmp;
2184         tmp = mdt_layer_setup(ctx, LUSTRE_MDD0_NAME, d, cfg);
2185         if (IS_ERR(tmp)) {
2186                 GOTO(out, rc = PTR_ERR(tmp));
2187         }
2188         d = tmp;
2189         md = lu2md_dev(d);
2190
2191         tmp = mdt_layer_setup(ctx, LUSTRE_CMM0_NAME, d, cfg);
2192         if (IS_ERR(tmp)) {
2193                 GOTO(out, rc = PTR_ERR(tmp));
2194         }
2195         d = tmp;
2196         /*set mdd upcall device*/
2197         md->md_upcall.mu_upcall_dev = lu2md_dev(d);
2198
2199         md = lu2md_dev(d);
2200         /*set cmm upcall device*/
2201         md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
2202
2203         m->mdt_child = lu2md_dev(d);
2204
2205         /* process setup config */
2206         tmp = &m->mdt_md_dev.md_lu_dev;
2207         rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
2208
2209         GOTO(out, rc);
2210 out:
2211         /* fini from last known good lu_device */
2212         if (rc)
2213                 mdt_stack_fini(ctx, m, d);
2214
2215         return rc;
2216 }
2217
2218 static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
2219 {
2220         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2221         struct lu_site   *ls = d->ld_site;
2222
2223         ENTRY;
2224
2225
2226         mdt_fs_cleanup(ctx, m);
2227         ping_evictor_stop();
2228         mdt_stop_ptlrpc_service(m);
2229
2230         /* finish the stack */
2231         mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2232
2233         mdt_fld_fini(ctx, m);
2234         mdt_seq_fini(ctx, m);
2235         mdt_seq_fini_cli(m);
2236
2237         LASSERT(atomic_read(&d->ld_ref) == 0);
2238         md_device_fini(&m->mdt_md_dev);
2239
2240         if (m->mdt_namespace != NULL) {
2241                 ldlm_namespace_free(m->mdt_namespace, 0);
2242                 m->mdt_namespace = NULL;
2243         }
2244
2245         if (ls) {
2246                 lu_site_fini(ls);
2247                 OBD_FREE_PTR(ls);
2248         }
2249
2250         EXIT;
2251 }
2252
2253 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
2254                      struct lu_device_type *t, struct lustre_cfg *cfg)
2255 {
2256         struct mdt_thread_info *info;
2257         struct obd_device      *obd;
2258         const char             *dev = lustre_cfg_string(cfg, 0);
2259         const char             *num = lustre_cfg_string(cfg, 2);
2260         struct lu_site         *s;
2261         int                     rc;
2262         ENTRY;
2263
2264         info = lu_context_key_get(ctx, &mdt_thread_key);
2265         LASSERT(info != NULL);
2266
2267         obd = class_name2obd(dev);
2268
2269         spin_lock_init(&m->mdt_transno_lock);
2270         /* FIXME: We need to load them from disk. But now fake it */
2271         m->mdt_last_transno = 1;
2272         m->mdt_last_committed = 1;
2273         m->mdt_max_mdsize = MAX_MD_SIZE;
2274         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
2275
2276         spin_lock_init(&m->mdt_epoch_lock);
2277         /* Temporary. should parse mount option. */
2278         m->mdt_opts.mo_user_xattr = 0;
2279         m->mdt_opts.mo_acl = 0;
2280         m->mdt_opts.mo_compat_resname = 0;
2281         obd->obd_replayable = 1;
2282
2283
2284         OBD_ALLOC_PTR(s);
2285         if (s == NULL)
2286                 RETURN(-ENOMEM);
2287
2288         md_device_init(&m->mdt_md_dev, t);
2289         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2290         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2291
2292         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2293         if (rc) {
2294                 CERROR("can't init lu_site, rc %d\n", rc);
2295                 GOTO(err_free_site, rc);
2296         }
2297
2298         /* init the stack */
2299         rc = mdt_stack_init(ctx, m, cfg);
2300         if (rc) {
2301                 CERROR("can't init device stack, rc %d\n", rc);
2302                 GOTO(err_fini_site, rc);
2303         }
2304         /* set server index */
2305         LASSERT(num);
2306         s->ls_node_id = simple_strtol(num, NULL, 10);
2307
2308         rc = mdt_fld_init(ctx, obd->obd_name, m);
2309         if (rc)
2310                 GOTO(err_fini_stack, rc);
2311
2312         rc = mdt_seq_init(ctx, obd->obd_name, m);
2313         if (rc)
2314                 GOTO(err_fini_fld, rc);
2315
2316         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
2317                  LUSTRE_MDT0_NAME"-%p", m);
2318         m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
2319                                               LDLM_NAMESPACE_SERVER);
2320         if (m->mdt_namespace == NULL)
2321                 GOTO(err_fini_seq, rc = -ENOMEM);
2322
2323         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
2324
2325         rc = mdt_start_ptlrpc_service(m);
2326         if (rc)
2327                 GOTO(err_free_ns, rc);
2328
2329         ping_evictor_start();
2330         rc = mdt_fs_setup(ctx, m);
2331         if (rc)
2332                 GOTO(err_stop_service, rc);
2333         RETURN(0);
2334
2335 err_stop_service:
2336         mdt_stop_ptlrpc_service(m);
2337 err_free_ns:
2338         ldlm_namespace_free(m->mdt_namespace, 0);
2339         m->mdt_namespace = NULL;
2340 err_fini_seq:
2341         mdt_seq_fini(ctx, m);
2342 err_fini_fld:
2343         mdt_fld_fini(ctx, m);
2344 err_fini_stack:
2345         mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2346 err_fini_site:
2347         lu_site_fini(s);
2348 err_free_site:
2349         OBD_FREE_PTR(s);
2350         md_device_fini(&m->mdt_md_dev);
2351         return (rc);
2352 }
2353
2354 /* used by MGS to process specific configurations */
2355 static int mdt_process_config(const struct lu_context *ctx,
2356                               struct lu_device *d, struct lustre_cfg *cfg)
2357 {
2358         struct mdt_device *m = mdt_dev(d);
2359         struct md_device *md_next  = m->mdt_child;
2360         struct lu_device *next = md2lu_dev(md_next);
2361         int err;
2362         ENTRY;
2363
2364         switch (cfg->lcfg_command) {
2365         case LCFG_ADD_MDC:
2366                 /*
2367                  * Add mdc hook to get first MDT uuid and connect it to
2368                  * ls->controller to use for seq manager.
2369                  */
2370                 err = mdt_seq_init_cli(ctx, mdt_dev(d), cfg);
2371                 if (err) {
2372                         CERROR("can't initialize controller export, "
2373                                "rc %d\n", err);
2374                 }
2375         default:
2376                 /* others are passed further */
2377                 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
2378                 break;
2379         }
2380         RETURN(err);
2381 }
2382
2383 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
2384                                           const struct lu_object_header *hdr,
2385                                           struct lu_device *d)
2386 {
2387         struct mdt_object *mo;
2388
2389         ENTRY;
2390
2391         OBD_ALLOC_PTR(mo);
2392         if (mo != NULL) {
2393                 struct lu_object *o;
2394                 struct lu_object_header *h;
2395
2396                 o = &mo->mot_obj.mo_lu;
2397                 h = &mo->mot_header;
2398                 lu_object_header_init(h);
2399                 lu_object_init(o, h, d);
2400                 lu_object_add_top(h, o);
2401                 o->lo_ops = &mdt_obj_ops;
2402                 RETURN(o);
2403         } else
2404                 RETURN(NULL);
2405 }
2406
2407 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
2408 {
2409         struct mdt_device *d = mdt_dev(o->lo_dev);
2410         struct lu_device  *under;
2411         struct lu_object  *below;
2412         int                rc = 0;
2413         ENTRY;
2414
2415         CDEBUG(D_INODE, "object init, fid = "DFID3"\n",
2416                PFID3(lu_object_fid(o)));
2417
2418         under = &d->mdt_child->md_lu_dev;
2419         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
2420         if (below != NULL) {
2421                 lu_object_add(o, below);
2422         } else
2423                 rc = -ENOMEM;
2424         RETURN(rc);
2425 }
2426
2427 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
2428 {
2429         struct mdt_object *mo = mdt_obj(o);
2430         struct lu_object_header *h;
2431         ENTRY;
2432
2433         h = o->lo_header;
2434         CDEBUG(D_INODE, "object free, fid = "DFID3"\n",
2435                PFID3(lu_object_fid(o)));
2436
2437         lu_object_fini(o);
2438         lu_object_header_fini(h);
2439         OBD_FREE_PTR(mo);
2440         EXIT;
2441 }
2442
2443 static int mdt_object_print(const struct lu_context *ctxt, void *cookie,
2444                             lu_printer_t p, const struct lu_object *o)
2445 {
2446         return (*p)(ctxt, cookie, LUSTRE_MDT0_NAME"-object@%p", o);
2447 }
2448
2449 int mdt_object_exists(const struct lu_context *ctx,
2450                       const struct lu_object *o)
2451 {
2452         return lu_object_exists(ctx, lu_object_next(o));
2453 }
2454
2455 static struct lu_device_operations mdt_lu_ops = {
2456         .ldo_object_alloc   = mdt_object_alloc,
2457         .ldo_process_config = mdt_process_config
2458 };
2459
2460 static struct lu_object_operations mdt_obj_ops = {
2461         .loo_object_init    = mdt_object_init,
2462         .loo_object_free    = mdt_object_free,
2463         .loo_object_print   = mdt_object_print,
2464         .loo_object_exists  = mdt_object_exists
2465 };
2466
2467 /* mds_connect_internal */
2468 static int mdt_connect0(struct mdt_device *mdt,
2469                         struct obd_export *exp, struct obd_connect_data *data)
2470 {
2471         if (data != NULL) {
2472                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
2473                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
2474
2475                 /* If no known bits (which should not happen, probably,
2476                    as everybody should support LOOKUP and UPDATE bits at least)
2477                    revert to compat mode with plain locks. */
2478                 if (!data->ocd_ibits_known &&
2479                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
2480                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
2481
2482                 if (!mdt->mdt_opts.mo_acl)
2483                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
2484
2485                 if (!mdt->mdt_opts.mo_user_xattr)
2486                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
2487
2488                 exp->exp_connect_flags = data->ocd_connect_flags;
2489                 data->ocd_version = LUSTRE_VERSION_CODE;
2490                 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
2491         }
2492
2493         if (mdt->mdt_opts.mo_acl &&
2494             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
2495                 CWARN("%s: MDS requires ACL support but client does not\n",
2496                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
2497                 return -EBADE;
2498         }
2499         return 0;
2500 }
2501
2502 /* mds_connect copy */
2503 static int mdt_obd_connect(const struct lu_context *ctx,
2504                            struct lustre_handle *conn, struct obd_device *obd,
2505                            struct obd_uuid *cluuid,
2506                            struct obd_connect_data *data)
2507 {
2508         struct obd_export      *exp;
2509         struct mdt_device      *mdt;
2510         struct mdt_export_data *med;
2511         struct mdt_client_data *mcd;
2512         int                     rc;
2513         ENTRY;
2514
2515         LASSERT(ctx != NULL);
2516         if (!conn || !obd || !cluuid)
2517                 RETURN(-EINVAL);
2518
2519         mdt = mdt_dev(obd->obd_lu_dev);
2520
2521         rc = class_connect(conn, obd, cluuid);
2522         if (rc)
2523                 RETURN(rc);
2524
2525         exp = class_conn2export(conn);
2526         LASSERT(exp != NULL);
2527         med = &exp->exp_mdt_data;
2528
2529         rc = mdt_connect0(mdt, exp, data);
2530         if (rc == 0) {
2531                 OBD_ALLOC_PTR(mcd);
2532                 if (mcd != NULL) {
2533                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
2534                         med->med_mcd = mcd;
2535                         /*
2536                          * rc = mdt_client_add(ctx, mdt, med, -1);
2537                          */
2538                         if (rc != 0)
2539                                 OBD_FREE_PTR(mcd);
2540                 } else
2541                         rc = -ENOMEM;
2542         }
2543         if (rc != 0)
2544                 class_disconnect(exp);
2545         else
2546                 class_export_put(exp);
2547
2548         RETURN(rc);
2549 }
2550
2551 static int mdt_obd_disconnect(struct obd_export *exp)
2552 {
2553         unsigned long irqflags;
2554         int rc;
2555         ENTRY;
2556
2557         LASSERT(exp);
2558         class_export_get(exp);
2559
2560         /* Disconnect early so that clients can't keep using export */
2561         rc = class_disconnect(exp);
2562         //ldlm_cancel_locks_for_export(exp);
2563
2564         /* complete all outstanding replies */
2565         spin_lock_irqsave(&exp->exp_lock, irqflags);
2566         while (!list_empty(&exp->exp_outstanding_replies)) {
2567                 struct ptlrpc_reply_state *rs =
2568                         list_entry(exp->exp_outstanding_replies.next,
2569                                    struct ptlrpc_reply_state, rs_exp_list);
2570                 struct ptlrpc_service *svc = rs->rs_service;
2571
2572                 spin_lock(&svc->srv_lock);
2573                 list_del_init(&rs->rs_exp_list);
2574                 ptlrpc_schedule_difficult_reply(rs);
2575                 spin_unlock(&svc->srv_lock);
2576         }
2577         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
2578
2579         class_export_put(exp);
2580         RETURN(rc);
2581 }
2582
2583 /* FIXME: Can we avoid using these two interfaces? */
2584 static int mdt_init_export(struct obd_export *exp)
2585 {
2586         struct mdt_export_data *med = &exp->exp_mdt_data;
2587         ENTRY;
2588
2589         INIT_LIST_HEAD(&med->med_open_head);
2590         spin_lock_init(&med->med_open_lock);
2591         exp->exp_connecting = 1;
2592         RETURN(0);
2593 }
2594
2595 static int mdt_destroy_export(struct obd_export *export)
2596 {
2597         struct mdt_export_data *med;
2598         struct obd_device *obd = export->exp_obd;
2599         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
2600         struct lu_context ctxt;
2601         int rc = 0;
2602         ENTRY;
2603
2604         med = &export->exp_mdt_data;
2605         target_destroy_export(export);
2606
2607         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
2608                 RETURN(0);
2609
2610         rc = lu_context_init(&ctxt, LCT_MD_THREAD);
2611         if (rc)
2612                 RETURN(rc);
2613
2614         lu_context_enter(&ctxt);
2615         /* Close any open files (which may also cause orphan unlinking). */
2616         spin_lock(&med->med_open_lock);
2617         while (!list_empty(&med->med_open_head)) {
2618                 struct list_head *tmp = med->med_open_head.next;
2619                 struct mdt_file_data *mfd =
2620                         list_entry(tmp, struct mdt_file_data, mfd_list);
2621
2622                 /* Remove mfd handle so it can't be found again.
2623                  * We are consuming the mfd_list reference here. */
2624                 class_handle_unhash(&mfd->mfd_handle);
2625                 list_del_init(&mfd->mfd_list);
2626                 spin_unlock(&med->med_open_lock);
2627                 mdt_mfd_close(&ctxt, mdt, mfd);
2628                 spin_lock(&med->med_open_lock);
2629         }
2630         spin_unlock(&med->med_open_lock);
2631         mdt_client_free(&ctxt, mdt, med);
2632
2633         lu_context_exit(&ctxt);
2634         lu_context_fini(&ctxt);
2635
2636         RETURN(rc);
2637 }
2638
2639 static int mdt_upcall(const struct lu_context *ctx, struct md_device *md,
2640                       enum md_upcall_event ev)
2641 {
2642         struct mdt_device *m = mdt_dev(&md->md_lu_dev);
2643         struct md_device  *next  = m->mdt_child;
2644         int rc = 0;
2645         ENTRY;
2646
2647         switch (ev) {
2648                 case MD_LOV_SYNC:
2649                         rc = next->md_ops->mdo_get_maxsize(ctx, next,
2650                                     &m->mdt_max_mdsize, &m->mdt_max_cookiesize);
2651                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d \n",
2652                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
2653                         break;
2654                 default:
2655                         CERROR("invalid event\n");
2656                         rc = -EINVAL;
2657                         break;
2658         }
2659         RETURN(rc);
2660 }
2661
2662
2663 static struct obd_ops mdt_obd_device_ops = {
2664         .o_owner          = THIS_MODULE,
2665         .o_connect        = mdt_obd_connect,
2666         .o_disconnect     = mdt_obd_disconnect,
2667         .o_init_export    = mdt_init_export,    /* By Huang Hua*/
2668         .o_destroy_export = mdt_destroy_export, /* By Huang Hua*/
2669 };
2670
2671 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
2672 {
2673         struct mdt_device *m = mdt_dev(d);
2674
2675         mdt_fini(ctx, m);
2676         OBD_FREE_PTR(m);
2677 }
2678
2679 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
2680                                           struct lu_device_type *t,
2681                                           struct lustre_cfg *cfg)
2682 {
2683         struct lu_device  *l;
2684         struct mdt_device *m;
2685
2686         OBD_ALLOC_PTR(m);
2687         if (m != NULL) {
2688                 int result;
2689
2690                 l = &m->mdt_md_dev.md_lu_dev;
2691                 result = mdt_init0(ctx, m, t, cfg);
2692                 if (result != 0) {
2693                         OBD_FREE_PTR(m);
2694                         l = ERR_PTR(result);
2695                 }
2696                 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
2697         } else
2698                 l = ERR_PTR(-ENOMEM);
2699         return l;
2700 }
2701
2702 /*
2703  * context key constructor/destructor
2704  */
2705
2706 static void *mdt_thread_init(const struct lu_context *ctx,
2707                              struct lu_context_key *key)
2708 {
2709         struct mdt_thread_info *info;
2710
2711         /*
2712          * check that no high order allocations are incurred.
2713          */
2714         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
2715         OBD_ALLOC_PTR(info);
2716         if (info == NULL)
2717                 info = ERR_PTR(-ENOMEM);
2718         return info;
2719 }
2720
2721 static void mdt_thread_fini(const struct lu_context *ctx,
2722                             struct lu_context_key *key, void *data)
2723 {
2724         struct mdt_thread_info *info = data;
2725         OBD_FREE_PTR(info);
2726 }
2727
2728 struct lu_context_key mdt_thread_key = {
2729         .lct_tags = LCT_MD_THREAD,
2730         .lct_init = mdt_thread_init,
2731         .lct_fini = mdt_thread_fini
2732 };
2733
2734 static void *mdt_txn_init(const struct lu_context *ctx,
2735                              struct lu_context_key *key)
2736 {
2737         struct mdt_txn_info *txi;
2738
2739         /*
2740          * check that no high order allocations are incurred.
2741          */
2742         CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
2743         OBD_ALLOC_PTR(txi);
2744         if (txi == NULL)
2745                 txi = ERR_PTR(-ENOMEM);
2746         return txi;
2747 }
2748
2749 static void mdt_txn_fini(const struct lu_context *ctx,
2750                             struct lu_context_key *key, void *data)
2751 {
2752         struct mdt_txn_info *txi = data;
2753         OBD_FREE_PTR(txi);
2754 }
2755
2756 struct lu_context_key mdt_txn_key = {
2757         .lct_tags = LCT_TX_HANDLE,
2758         .lct_init = mdt_txn_init,
2759         .lct_fini = mdt_txn_fini
2760 };
2761
2762
2763 static int mdt_type_init(struct lu_device_type *t)
2764 {
2765         int rc;
2766
2767         rc = lu_context_key_register(&mdt_thread_key);
2768         if (rc == 0)
2769                 rc = lu_context_key_register(&mdt_txn_key);
2770         return rc;
2771 }
2772
2773 static void mdt_type_fini(struct lu_device_type *t)
2774 {
2775         lu_context_key_degister(&mdt_thread_key);
2776         lu_context_key_degister(&mdt_txn_key);
2777 }
2778
2779 static struct lu_device_type_operations mdt_device_type_ops = {
2780         .ldto_init = mdt_type_init,
2781         .ldto_fini = mdt_type_fini,
2782
2783         .ldto_device_alloc = mdt_device_alloc,
2784         .ldto_device_free  = mdt_device_free
2785 };
2786
2787 static struct lu_device_type mdt_device_type = {
2788         .ldt_tags     = LU_DEVICE_MD,
2789         .ldt_name     = LUSTRE_MDT0_NAME,
2790         .ldt_ops      = &mdt_device_type_ops,
2791         .ldt_ctx_tags = LCT_MD_THREAD
2792 };
2793
2794 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
2795         { 0 }
2796 };
2797
2798 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
2799         { 0 }
2800 };
2801
2802 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
2803
2804 static int __init mdt_mod_init(void)
2805 {
2806         int result;
2807         struct lprocfs_static_vars lvars;
2808
2809         mdt_num_threads = MDT_NUM_THREADS;
2810         lprocfs_init_vars(mdt, &lvars);
2811         result = class_register_type(&mdt_obd_device_ops, NULL,
2812                                      lvars.module_vars, LUSTRE_MDT0_NAME,
2813                                      &mdt_device_type);
2814         return result;
2815 }
2816
2817 static void __exit mdt_mod_exit(void)
2818 {
2819         class_unregister_type(LUSTRE_MDT0_NAME);
2820 }
2821
2822
2823 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
2824 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
2825         .mh_name    = #opc,                                             \
2826         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
2827         .mh_opc     = prefix ## _  ## opc,                              \
2828         .mh_flags   = flags,                                            \
2829         .mh_act     = fn,                                               \
2830         .mh_fmt     = fmt                                               \
2831 }
2832
2833 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
2834         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
2835 /*
2836  * Request with a format known in advance
2837  */
2838 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
2839         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
2840 /*
2841  * Request with a format we do not yet know
2842  */
2843 #define DEF_MDT_HNDL_0(flags, name, fn)                                 \
2844         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
2845
2846 static struct mdt_handler mdt_mds_ops[] = {
2847 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
2848 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
2849 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
2850 DEF_MDT_HNDL_F(HABEO_CORPUS             , GETATTR,      mdt_getattr),
2851 DEF_MDT_HNDL_F(HABEO_CORPUS             , GETATTR_NAME, mdt_getattr_name),
2852 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO|MUTABOR,
2853                                           SETXATTR,     mdt_setxattr),
2854 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
2855 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
2856 DEF_MDT_HNDL_F(0                        |MUTABOR,
2857                                           REINT,        mdt_reint),
2858 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, CLOSE,        mdt_close),
2859 DEF_MDT_HNDL_0(0,                         DONE_WRITING, mdt_done_writing),
2860 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
2861 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
2862 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_handle_quotacheck),
2863 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_handle_quotactl)
2864 };
2865
2866 #define DEF_OBD_HNDL(flags, name, fn)                   \
2867         DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
2868
2869
2870 static struct mdt_handler mdt_obd_ops[] = {
2871         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
2872         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
2873         DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
2874 };
2875
2876 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
2877         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
2878 #define DEF_DLM_HNDL_F(flags, name, fn)                   \
2879         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
2880
2881 static struct mdt_handler mdt_dlm_ops[] = {
2882         DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
2883         DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
2884         DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
2885         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
2886 };
2887
2888 static struct mdt_handler mdt_llog_ops[] = {
2889 };
2890
2891 static struct mdt_opc_slice mdt_handlers[] = {
2892         {
2893                 .mos_opc_start = MDS_GETATTR,
2894                 .mos_opc_end   = MDS_LAST_OPC,
2895                 .mos_hs        = mdt_mds_ops
2896         },
2897         {
2898                 .mos_opc_start = OBD_PING,
2899                 .mos_opc_end   = OBD_LAST_OPC,
2900                 .mos_hs        = mdt_obd_ops
2901         },
2902         {
2903                 .mos_opc_start = LDLM_ENQUEUE,
2904                 .mos_opc_end   = LDLM_LAST_OPC,
2905                 .mos_hs        = mdt_dlm_ops
2906         },
2907         {
2908                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
2909                 .mos_opc_end   = LLOG_LAST_OPC,
2910                 .mos_hs        = mdt_llog_ops
2911         },
2912         {
2913                 .mos_hs        = NULL
2914         }
2915 };
2916
2917 static struct mdt_handler mdt_mds_readpage_ops[] = {
2918         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
2919 };
2920
2921 static struct mdt_opc_slice mdt_readpage_handlers[] = {
2922         {
2923                 .mos_opc_start = MDS_GETATTR,
2924                 .mos_opc_end   = MDS_LAST_OPC,
2925                 .mos_hs        = mdt_mds_readpage_ops
2926         },
2927         {
2928                 .mos_hs        = NULL
2929         }
2930 };
2931
2932 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2933 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT0_NAME")");
2934 MODULE_LICENSE("GPL");
2935
2936 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
2937                 "number of mdt service threads to start");
2938
2939 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);