Whamcloud - gitweb
- fix defect in mdt_postrecov() with calling wrong lu_device operation
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mdt/mdt_handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *   Author: Huang Hua <huanghua@clusterfs.com>
14  *
15  *   This file is part of the Lustre file system, http://www.lustre.org
16  *   Lustre is a trademark of Cluster File Systems, Inc.
17  *
18  *   You may have signed or agreed to another license before downloading
19  *   this software.  If so, you are bound by the terms and conditions
20  *   of that agreement, and the following does not apply to you.  See the
21  *   LICENSE file included with this distribution for more information.
22  *
23  *   If you did not agree to a different license, then this copy of Lustre
24  *   is open source software; you can redistribute it and/or modify it
25  *   under the terms of version 2 of the GNU General Public License as
26  *   published by the Free Software Foundation.
27  *
28  *   In either case, Lustre is distributed in the hope that it will be
29  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31  *   license text for more details.
32  */
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37 #define DEBUG_SUBSYSTEM S_MDS
38
39 #include <linux/module.h>
40
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
43 /*
44  * struct OBD_{ALLOC,FREE}*()
45  * MDT_FAIL_CHECK
46  */
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
53 #include <obd.h>
54 /* lu2dt_dev() */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include <lustre_mdt.h>
58 #include "mdt_internal.h"
59 #include <linux/lustre_acl.h>
60 /*
61  * Initialized in mdt_mod_init().
62  */
63 unsigned long mdt_num_threads;
64
65 /* ptlrpc request handler for MDT. All handlers are
66  * grouped into several slices - struct mdt_opc_slice,
67  * and stored in an array - mdt_handlers[].
68  */
69 struct mdt_handler {
70         /* The name of this handler. */
71         const char *mh_name;
72         /* Fail id for this handler, checked at the beginning of this handler*/
73         int         mh_fail_id;
74         /* Operation code for this handler */
75         __u32       mh_opc;
76         /* flags are listed in enum mdt_handler_flags below. */
77         __u32       mh_flags;
78         /* The actual handler function to execute. */
79         int (*mh_act)(struct mdt_thread_info *info);
80         /* Request format for this request. */
81         const struct req_format *mh_fmt;
82 };
83
84 enum mdt_handler_flags {
85         /*
86          * struct mdt_body is passed in the incoming message, and object
87          * identified by this fid exists on disk.
88          *
89          * "habeo corpus" == "I have a body"
90          */
91         HABEO_CORPUS = (1 << 0),
92         /*
93          * struct ldlm_request is passed in the incoming message.
94          *
95          * "habeo clavis" == "I have a key"
96          */
97         HABEO_CLAVIS = (1 << 1),
98         /*
99          * this request has fixed reply format, so that reply message can be
100          * packed by generic code.
101          *
102          * "habeo refero" == "I have a reply"
103          */
104         HABEO_REFERO = (1 << 2),
105         /*
106          * this request will modify something, so check whether the filesystem
107          * is readonly or not, then return -EROFS to client asap if necessary.
108          *
109          * "mutabor" == "I shall modify"
110          */
111         MUTABOR      = (1 << 3)
112 };
113
114 struct mdt_opc_slice {
115         __u32               mos_opc_start;
116         int                 mos_opc_end;
117         struct mdt_handler *mos_hs;
118 };
119
120 static struct mdt_opc_slice mdt_regular_handlers[];
121 static struct mdt_opc_slice mdt_readpage_handlers[];
122 static struct mdt_opc_slice mdt_seq_handlers[];
123 static struct mdt_opc_slice mdt_fld_handlers[];
124
125 static struct mdt_device *mdt_dev(struct lu_device *d);
126 static int mdt_regular_handle(struct ptlrpc_request *req);
127 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
128
129 static struct lu_object_operations mdt_obj_ops;
130
131 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
132 {
133         if (!rep)
134                 return 0;
135         return (rep->lock_policy_res1 & flag);
136 }
137
138 void mdt_set_disposition(struct mdt_thread_info *info,
139                                 struct ldlm_reply *rep, int flag)
140 {
141         if (info)
142                 info->mti_opdata |= flag;
143         if (rep)
144                 rep->lock_policy_res1 |= flag;
145 }
146
147
148 static int mdt_getstatus(struct mdt_thread_info *info)
149 {
150         struct md_device *next  = info->mti_mdt->mdt_child;
151         int               rc;
152         struct mdt_body  *body;
153
154         ENTRY;
155
156         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
157                 rc = -ENOMEM;
158         else {
159                 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
160                 rc = next->md_ops->mdo_root_get(info->mti_ctxt,
161                                                 next, &body->fid1);
162                 if (rc == 0)
163                         body->valid |= OBD_MD_FLID;
164         }
165
166         RETURN(rc);
167 }
168
169 static int mdt_statfs(struct mdt_thread_info *info)
170 {
171         struct md_device  *next  = info->mti_mdt->mdt_child;
172         struct obd_statfs *osfs;
173         int                rc;
174
175         ENTRY;
176
177         /* This will trigger a watchdog timeout */
178         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
179                          (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
180
181
182         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
183                 rc = -ENOMEM;
184         } else {
185                 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
186                 /* XXX max_age optimisation is needed here. See mds_statfs */
187                 rc = next->md_ops->mdo_statfs(info->mti_ctxt,
188                                               next, &info->mti_u.ksfs);
189                 statfs_pack(osfs, &info->mti_u.ksfs);
190         }
191
192         RETURN(rc);
193 }
194
195 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
196                         const struct lu_fid *fid)
197 {
198         /*XXX should pack the reply body according to lu_valid*/
199         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID   |
200                     OBD_MD_FLGID   | OBD_MD_FLTYPE  |
201                     OBD_MD_FLMODE  | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
202                     OBD_MD_FLATIME | OBD_MD_FLMTIME ;
203
204         if (!S_ISREG(attr->la_mode))
205                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
206
207         b->atime      = attr->la_atime;
208         b->mtime      = attr->la_mtime;
209         b->ctime      = attr->la_ctime;
210         b->mode       = attr->la_mode;
211         b->size       = attr->la_size;
212         b->blocks     = attr->la_blocks;
213         b->uid        = attr->la_uid;
214         b->gid        = attr->la_gid;
215         b->flags      = attr->la_flags;
216         b->nlink      = attr->la_nlink;
217         b->rdev       = attr->la_rdev;
218
219         if (fid) {
220                 b->fid1 = *fid;
221                 b->valid |= OBD_MD_FLID;
222                 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
223                                 PFID(fid), b->nlink, b->mode, b->size);
224         }
225 }
226
227 static inline int mdt_body_has_lov(const struct lu_attr *la,
228                                    const struct mdt_body *body)
229 {
230         return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
231                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
232 }
233
234 static int mdt_getattr_internal(struct mdt_thread_info *info,
235                                 struct mdt_object *o)
236 {
237         struct md_object        *next = mdt_object_child(o);
238         const struct mdt_body   *reqbody = info->mti_body;
239         struct ptlrpc_request   *req = mdt_info_req(info);
240         struct md_attr          *ma = &info->mti_attr;
241         struct lu_attr          *la = &ma->ma_attr;
242         struct req_capsule      *pill = &info->mti_pill;
243         const struct lu_context *ctxt = info->mti_ctxt;
244         struct mdt_body         *repbody;
245         void                    *buffer;
246         int                     length;
247         int                     rc;
248         ENTRY;
249
250         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
251                 RETURN(-ENOMEM);
252
253         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
254         repbody->eadatasize = 0;
255         repbody->aclsize = 0;
256
257         ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
258         ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
259
260         ma->ma_need = MA_INODE | MA_LOV;
261         rc = mo_attr_get(ctxt, next, ma);
262         if (rc == -EREMOTE) {
263                 /* This object is located on remote node.*/
264                 repbody->fid1 = *mdt_object_fid(o);
265                 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
266                 RETURN(0);
267         } else if (rc){
268                 CERROR("getattr error for "DFID": %d\n",
269                         PFID(mdt_object_fid(o)), rc);
270                 RETURN(rc);
271         }
272
273         if (ma->ma_valid & MA_INODE)
274                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
275         else
276                 RETURN(-EFAULT);
277
278         if (mdt_body_has_lov(la, reqbody)) {
279                 if (ma->ma_valid & MA_LOV) {
280                         LASSERT(ma->ma_lmm_size);
281                         mdt_dump_lmm(D_INFO, ma->ma_lmm);
282                         repbody->eadatasize = ma->ma_lmm_size;
283                         if (S_ISDIR(la->la_mode))
284                                 repbody->valid |= OBD_MD_FLDIREA;
285                         else
286                                 repbody->valid |= OBD_MD_FLEASIZE;
287                 }
288         } else if (S_ISLNK(la->la_mode) &&
289                           reqbody->valid & OBD_MD_LINKNAME) {
290                 rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
291                 if (rc <= 0) {
292                         CERROR("readlink failed: %d\n", rc);
293                         rc = -EFAULT;
294                 } else {
295                         repbody->valid |= OBD_MD_LINKNAME;
296                         repbody->eadatasize = rc + 1;
297                         ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
298                         CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
299                                         (char*)ma->ma_lmm, rc);
300                         rc = 0;
301                 }
302         }
303
304         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
305                 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
306                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
307                 repbody->valid |= OBD_MD_FLMODEASIZE;
308                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
309                                 "MAX_COOKIE to : %d:%d\n",
310                                 repbody->max_mdsize,
311                                 repbody->max_cookiesize);
312         }
313
314 #ifdef CONFIG_FS_POSIX_ACL
315         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
316             (reqbody->valid & OBD_MD_FLACL)) {
317                 buffer = req_capsule_server_get(pill, &RMF_ACL);
318                 length = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
319                 if (length > 0) {
320                         rc = mo_xattr_get(ctxt, next, buffer,
321                                           length, XATTR_NAME_ACL_ACCESS);
322                         if (rc < 0) {
323                                 if (rc == -ENODATA || rc == -EOPNOTSUPP)
324                                         rc = 0;
325                                 else
326                                         CERROR("got acl size: %d\n", rc);
327                         } else {
328                                 repbody->aclsize = rc;
329                                 repbody->valid |= OBD_MD_FLACL;
330                         }
331                 }
332         }
333 #endif
334
335         RETURN(rc);
336 }
337
338 static int mdt_getattr(struct mdt_thread_info *info)
339 {
340         int rc;
341         struct mdt_object *obj;
342
343         obj = info->mti_object;
344         LASSERT(obj != NULL);
345         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
346         ENTRY;
347
348         rc = mdt_getattr_internal(info, obj);
349         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
350         RETURN(rc);
351 }
352
353 /*
354  * UPDATE lock should be taken against parent, and be release before exit;
355  * child_bits lock should be taken against child, and be returned back:
356  *            (1)normal request should release the child lock;
357  *            (2)intent request will grant the lock to client.
358  */
359 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
360                                  struct mdt_lock_handle *lhc,
361                                  __u64 child_bits,
362                                  struct ldlm_reply *ldlm_rep)
363 {
364         struct mdt_object *parent = info->mti_object;
365         struct mdt_object *child;
366         struct md_object  *next = mdt_object_child(info->mti_object);
367         struct lu_fid     *child_fid = &info->mti_tmp_fid1;
368         const char        *name;
369         int               rc;
370         struct mdt_lock_handle *lhp;
371         ENTRY;
372
373         LASSERT(info->mti_object != NULL);
374         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
375         if (name == NULL)
376                 RETURN(-EFAULT);
377
378         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
379                         PFID(mdt_object_fid(parent)), name, ldlm_rep);
380
381         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
382         if (strlen(name) == 0) {
383                 /* only getattr on the child. parent is on another node. */
384                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
385                 child = parent;
386                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
387                                ", ldlm_rep=%p\n",
388                                PFID(mdt_object_fid(child)), ldlm_rep);
389
390                 mdt_lock_handle_init(lhc);
391                 lhc->mlh_mode = LCK_CR;
392                 rc = mdt_object_lock(info, child, lhc, child_bits);
393                 if (rc == 0) {
394                         /* finally, we can get attr for child. */
395                         rc = mdt_getattr_internal(info, child);
396                         if (rc != 0)
397                                 mdt_object_unlock(info, child, lhc, 1);
398                 }
399                 GOTO(out, rc);
400         }
401
402         /*step 1: lock parent */
403         lhp = &info->mti_lh[MDT_LH_PARENT];
404         lhp->mlh_mode = LCK_CR;
405         rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
406         if (rc != 0)
407                 RETURN(rc);
408
409         /*step 2: lookup child's fid by name */
410         rc = mdo_lookup(info->mti_ctxt, next, name, child_fid);
411         if (rc != 0) {
412                 if (rc == -ENOENT)
413                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
414                 GOTO(out_parent, rc);
415         } else
416                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
417         /*
418          *step 3: find the child object by fid & lock it.
419          *        regardless if it is local or remote.
420          */
421         mdt_lock_handle_init(lhc);
422         lhc->mlh_mode = LCK_CR;
423         child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
424         if (IS_ERR(child))
425                 GOTO(out_parent, rc = PTR_ERR(child));
426
427         /* finally, we can get attr for child. */
428         rc = mdt_getattr_internal(info, child);
429         if (rc != 0)
430                 mdt_object_unlock(info, child, lhc, 1);
431         else {
432                 /* This is pure debugging code. */
433                 struct ldlm_lock *lock;
434                 struct ldlm_res_id *res_id;
435                 lock = ldlm_handle2lock(&lhc->mlh_lh);
436                 if (lock) {
437                         res_id = &lock->l_resource->lr_name;
438                         LDLM_DEBUG(lock, "we will return this lock client\n");
439                         LASSERTF(fid_res_name_eq(mdt_object_fid(child),
440                                                  &lock->l_resource->lr_name),
441                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
442                                 (unsigned long)res_id->name[0],
443                                 (unsigned long)res_id->name[1],
444                                 (unsigned long)res_id->name[2],
445                                 PFID(mdt_object_fid(child)));
446                         LDLM_LOCK_PUT(lock);
447                 }
448         }
449         mdt_object_put(info->mti_ctxt, child);
450
451         EXIT;
452 out_parent:
453         mdt_object_unlock(info, parent, lhp, 1);
454 out:
455         return rc;
456 }
457
458 /* normal handler: should release the child lock */
459 static int mdt_getattr_name(struct mdt_thread_info *info)
460 {
461         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
462         int rc;
463
464         ENTRY;
465
466         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
467         if (lustre_handle_is_used(&lhc->mlh_lh)) {
468                 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
469                 lhc->mlh_lh.cookie = 0;
470         }
471         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
472         RETURN(rc);
473 }
474
475 static struct lu_device_operations mdt_lu_ops;
476
477 static int lu_device_is_mdt(struct lu_device *d)
478 {
479         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
480 }
481
482 static inline struct mdt_device *mdt_dev(struct lu_device *d)
483 {
484         LASSERT(lu_device_is_mdt(d));
485         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
486 }
487
488 static int mdt_connect(struct mdt_thread_info *info)
489 {
490         int rc;
491         struct ptlrpc_request *req;
492
493         req = mdt_info_req(info);
494         rc = target_handle_connect(req, mdt_regular_handle);
495         if (rc == 0) {
496                 LASSERT(req->rq_export != NULL);
497                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
498         }
499         return rc;
500 }
501
502 static int mdt_disconnect(struct mdt_thread_info *info)
503 {
504         return target_handle_disconnect(mdt_info_req(info));
505 }
506
507 static int mdt_sendpage(struct mdt_thread_info *info,
508                         struct lu_rdpg *rdpg)
509 {
510         struct ptlrpc_request   *req = mdt_info_req(info);
511         struct ptlrpc_bulk_desc *desc;
512         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
513         int                      tmpcount;
514         int                      tmpsize;
515         int                      i;
516         int                      rc;
517         ENTRY;
518
519         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
520                                     MDS_BULK_PORTAL);
521         if (desc == NULL)
522                 GOTO(out, rc = -ENOMEM);
523
524         for (i = 0, tmpcount = rdpg->rp_count;
525                 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
526                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
527                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
528         }
529
530         LASSERT(desc->bd_nob == rdpg->rp_count);
531         rc = ptlrpc_start_bulk_transfer(desc);
532         if (rc)
533                 GOTO(free_desc, rc);
534
535         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
536                 GOTO(abort_bulk, rc);
537
538         *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
539         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
540         LASSERT (rc == 0 || rc == -ETIMEDOUT);
541
542         if (rc == 0) {
543                 if (desc->bd_success &&
544                     desc->bd_nob_transferred == rdpg->rp_count)
545                         GOTO(free_desc, rc);
546
547                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
548         }
549
550         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
551                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
552                   desc->bd_nob_transferred, rdpg->rp_count,
553                   req->rq_export->exp_client_uuid.uuid,
554                   req->rq_export->exp_connection->c_remote_uuid.uuid);
555
556         class_fail_export(req->rq_export);
557
558         EXIT;
559 abort_bulk:
560         ptlrpc_abort_bulk(desc);
561 free_desc:
562         ptlrpc_free_bulk(desc);
563 out:
564         return rc;
565 }
566
567 #ifdef HAVE_SPLIT_SUPPORT
568 /*
569  * Retrieve dir entry from the page and insert it to the
570  * slave object, actually, this should be in osd layer,
571  * but since it will not in the final product, so just do
572  * it here and do not define more moo api anymore for
573  * this.
574  */
575 static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
576 {
577         struct mdt_object *object = info->mti_object;
578         struct lu_dirpage *dp;
579         struct lu_dirent *ent;
580         int rc = 0;
581
582         kmap(page);
583         dp = page_address(page);
584         for (ent = lu_dirent_start(dp); ent != NULL;
585                           ent = lu_dirent_next(ent)) {
586                 struct lu_fid *lf = &ent->lde_fid;
587
588                 /* FIXME: check isdir */
589                 rc = mdo_name_insert(info->mti_ctxt,
590                                      md_object_next(&object->mot_obj),
591                                      ent->lde_name, lf, 0);
592                 /* FIXME: add cross_flags */
593                 if (rc) {
594                         kunmap(page);
595                         RETURN(rc);
596                 }
597         }
598         kunmap(page);
599         RETURN(rc);
600 }
601
602 static int mdt_bulk_timeout(void *data)
603 {
604         ENTRY;
605         /* We don't fail the connection here, because having the export
606          * killed makes the (vital) call to commitrw very sad.
607          */
608         RETURN(1);
609 }
610
611 static int mdt_writepage(struct mdt_thread_info *info)
612 {
613         struct ptlrpc_request   *req = mdt_info_req(info);
614         struct l_wait_info      *lwi;
615         struct ptlrpc_bulk_desc *desc;
616         struct page             *page;
617         int                rc;
618         ENTRY;
619
620         desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
621         if (desc)
622                 RETURN(-ENOMEM);
623
624         /* allocate the page for the desc */
625         page = alloc_pages(GFP_KERNEL, 0);
626         if (!page)
627                 GOTO(desc_cleanup, rc = -ENOMEM);
628
629         ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE);
630
631         /* FIXME: following parts are copied from ost_brw_write */
632
633         /* Check if client was evicted while we were doing i/o before touching
634            network */
635         OBD_ALLOC_PTR(lwi);
636         if (!lwi)
637                 GOTO(cleanup_page, rc = -ENOMEM);
638
639         if (desc->bd_export->exp_failed)
640                 rc = -ENOTCONN;
641         else
642                 rc = ptlrpc_start_bulk_transfer (desc);
643         if (rc == 0) {
644                 *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
645                                             mdt_bulk_timeout, desc);
646                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
647                                   desc->bd_export->exp_failed, lwi);
648                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
649                 if (rc == -ETIMEDOUT) {
650                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
651                         ptlrpc_abort_bulk(desc);
652                 } else if (desc->bd_export->exp_failed) {
653                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
654                         rc = -ENOTCONN;
655                         ptlrpc_abort_bulk(desc);
656                 } else if (!desc->bd_success ||
657                            desc->bd_nob_transferred != desc->bd_nob) {
658                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
659                                   desc->bd_success ?
660                                   "truncated" : "network error on",
661                                   desc->bd_nob_transferred, desc->bd_nob);
662                         /* XXX should this be a different errno? */
663                         rc = -ETIMEDOUT;
664                 }
665         } else {
666                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
667         }
668         if (rc)
669                 GOTO(cleanup_lwi, rc);
670         rc = mdt_write_dir_page(info, page);
671
672 cleanup_lwi:
673         OBD_FREE_PTR(lwi);
674 cleanup_page:
675         __free_pages(page, 0);
676 desc_cleanup:
677         ptlrpc_free_bulk(desc);
678         RETURN(rc);
679 }
680 #endif
681
682 static int mdt_readpage(struct mdt_thread_info *info)
683 {
684         struct mdt_object *object = info->mti_object;
685         struct lu_rdpg    *rdpg = &info->mti_u.rdpg.mti_rdpg;
686         struct mdt_body   *reqbody;
687         struct mdt_body   *repbody;
688         int                rc;
689         int                i;
690         ENTRY;
691
692         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
693                 RETURN(-ENOMEM);
694
695         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
696         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
697         if (reqbody == NULL || repbody == NULL)
698                 RETURN(-EFAULT);
699
700         /*
701          * prepare @rdpg before calling lower layers and transfer itself. Here
702          * reqbody->size contains offset of where to start to read and
703          * reqbody->nlink contains number bytes to read.
704          */
705         rdpg->rp_hash = reqbody->size;
706         if ((__u64)rdpg->rp_hash != reqbody->size) {
707                 CERROR("Invalid hash: %#llx != %#llx\n",
708                        (__u64)rdpg->rp_hash, reqbody->size);
709                 RETURN(-EFAULT);
710         }
711         rdpg->rp_count  = reqbody->nlink;
712         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
713         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
714         if (rdpg->rp_pages == NULL)
715                 RETURN(-ENOMEM);
716
717         for (i = 0; i < rdpg->rp_npages; ++i) {
718                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
719                 if (rdpg->rp_pages[i] == NULL)
720                         GOTO(free_rdpg, rc = -ENOMEM);
721         }
722
723         /* call lower layers to fill allocated pages with directory data */
724         rc = mo_readpage(info->mti_ctxt, mdt_object_child(object), rdpg);
725         if (rc)
726                 GOTO(free_rdpg, rc);
727
728         /* send pages to client */
729         rc = mdt_sendpage(info, rdpg);
730
731         EXIT;
732 free_rdpg:
733         for (i = 0; i < rdpg->rp_npages; i++)
734                 if (rdpg->rp_pages[i] != NULL)
735                         __free_pages(rdpg->rp_pages[i], 0);
736         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
737
738         MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
739
740         return rc;
741 }
742
743 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
744 {
745         struct req_capsule      *pill = &info->mti_pill;
746         struct mdt_device       *mdt = info->mti_mdt;
747         struct ptlrpc_request   *req = mdt_info_req(info);
748         int                      rc;
749         ENTRY;
750
751         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
752                 RETURN(-EFAULT);
753
754         rc = mdt_reint_unpack(info, op);
755         if (rc != 0)
756                 RETURN(rc);
757
758         /*pack reply*/
759         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
760                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
761                                      mdt->mdt_max_mdsize);
762         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
763                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
764                                      mdt->mdt_max_cookiesize);
765         rc = req_capsule_pack(pill);
766         if (rc != 0)
767                 RETURN(rc);
768
769         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
770                 struct mdt_client_data *mcd;
771
772                 mcd = req->rq_export->exp_mdt_data.med_mcd;
773                 if (mcd->mcd_last_xid == req->rq_xid) {
774                         mdt_reconstruct(info);
775                         RETURN(lustre_msg_get_status(req->rq_repmsg));
776                 }
777                 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
778                                      mcd->mcd_last_xid);
779         }
780         rc = mdt_reint_rec(info);
781
782         RETURN(rc);
783 }
784
785 static long mdt_reint_opcode(struct mdt_thread_info *info,
786                              const struct req_format **fmt)
787 {
788         __u32 *ptr;
789         long opc;
790
791         opc = -EFAULT;
792         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
793         if (ptr != NULL) {
794                 opc = *ptr;
795                 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
796                 if (opc < REINT_MAX && fmt[opc] != NULL)
797                         req_capsule_extend(&info->mti_pill, fmt[opc]);
798                 else
799                         CERROR("Unsupported opc: %ld\n", opc);
800         }
801         return opc;
802 }
803
804 static int mdt_reint(struct mdt_thread_info *info)
805 {
806         long opc;
807         int  rc;
808
809         static const struct req_format *reint_fmts[REINT_MAX] = {
810                 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
811                 [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
812                 [REINT_LINK]    = &RQF_MDS_REINT_LINK,
813                 [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
814                 [REINT_RENAME]  = &RQF_MDS_REINT_RENAME,
815                 [REINT_OPEN]    = &RQF_MDS_REINT_OPEN
816         };
817
818         ENTRY;
819
820         opc = mdt_reint_opcode(info, reint_fmts);
821         if (opc >= 0) {
822                 rc = mdt_reint_internal(info, opc);
823         } else
824                 rc = opc;
825
826         info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
827         RETURN(rc);
828 }
829
830 /* TODO these two methods not available now. */
831
832 /* this should sync the whole device */
833 static int mdt_device_sync(struct mdt_thread_info *info)
834 {
835         return 0;
836 }
837
838 /* this should sync this object */
839 static int mdt_object_sync(struct mdt_thread_info *info)
840 {
841         return 0;
842 }
843
844 static int mdt_sync(struct mdt_thread_info *info)
845 {
846         struct req_capsule *pill = &info->mti_pill;
847         struct mdt_body *body;
848         int rc;
849         ENTRY;
850
851         /* The fid may be zero, so we req_capsule_set manually */
852         req_capsule_set(pill, &RQF_MDS_SYNC);
853
854         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
855         if (body == NULL)
856                 RETURN(-EINVAL);
857
858         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
859                 RETURN(-ENOMEM);
860
861         if (fid_seq(&body->fid1) == 0) {
862                 /* sync the whole device */
863                 rc = req_capsule_pack(pill);
864                 if (rc == 0)
865                         rc = mdt_device_sync(info);
866         } else {
867                 /* sync an object */
868                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
869                 if (rc == 0) {
870                         rc = mdt_object_sync(info);
871                         if (rc == 0) {
872                                 struct md_object    *next;
873                                 const struct lu_fid *fid;
874                                 struct lu_attr      *la;
875
876                                 next = mdt_object_child(info->mti_object);
877                                 fid = mdt_object_fid(info->mti_object);
878                                 info->mti_attr.ma_need = MA_INODE;
879                                 rc = mo_attr_get(info->mti_ctxt, next,
880                                                  &info->mti_attr);
881                                 la = &info->mti_attr.ma_attr;
882                                 if (rc == 0) {
883                                         body = req_capsule_server_get(pill,
884                                                                 &RMF_MDT_BODY);
885                                         mdt_pack_attr2body(body, la, fid);
886                                 }
887                         }
888                 }
889         }
890         RETURN(rc);
891 }
892
893 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
894 {
895         return -EOPNOTSUPP;
896 }
897
898 static int mdt_quotactl_handle(struct mdt_thread_info *info)
899 {
900         return -EOPNOTSUPP;
901 }
902
903 /*
904  * OBD PING and other handlers.
905  */
906 static int mdt_obd_ping(struct mdt_thread_info *info)
907 {
908         int rc;
909         ENTRY;
910         rc = target_handle_ping(mdt_info_req(info));
911         RETURN(rc);
912 }
913
914 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
915 {
916         return -EOPNOTSUPP;
917 }
918
919 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
920 {
921         return -EOPNOTSUPP;
922 }
923
924
925 /*
926  * DLM handlers.
927  */
928
929 static struct ldlm_callback_suite cbs = {
930         .lcs_completion = ldlm_server_completion_ast,
931         .lcs_blocking   = ldlm_server_blocking_ast,
932         .lcs_glimpse    = NULL
933 };
934
935 static int mdt_enqueue(struct mdt_thread_info *info)
936 {
937         int rc;
938         struct ptlrpc_request *req;
939
940         /*
941          * info->mti_dlm_req already contains swapped and (if necessary)
942          * converted dlm request.
943          */
944         LASSERT(info->mti_dlm_req != NULL);
945
946         req = mdt_info_req(info);
947         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
948         rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
949                                       req, info->mti_dlm_req, &cbs);
950         return rc ? : req->rq_status;
951 }
952
953 static int mdt_convert(struct mdt_thread_info *info)
954 {
955         int rc;
956         struct ptlrpc_request *req;
957
958         LASSERT(info->mti_dlm_req);
959         req = mdt_info_req(info);
960         rc = ldlm_handle_convert0(req, info->mti_dlm_req);
961         return rc ? : req->rq_status;
962 }
963
964 static int mdt_bl_callback(struct mdt_thread_info *info)
965 {
966         CERROR("bl callbacks should not happen on MDS\n");
967         LBUG();
968         return -EOPNOTSUPP;
969 }
970
971 static int mdt_cp_callback(struct mdt_thread_info *info)
972 {
973         CERROR("cp callbacks should not happen on MDS\n");
974         LBUG();
975         return -EOPNOTSUPP;
976 }
977
978 /*
979  * Build (DLM) resource name from fid.
980  */
981 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
982                                        struct ldlm_res_id *name)
983 {
984         memset(name, 0, sizeof *name);
985         name->name[0] = fid_seq(f);
986         name->name[1] = fid_oid(f);
987         name->name[2] = fid_ver(f);
988         return name;
989 }
990
991 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
992 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
993              struct lustre_handle *lh, ldlm_mode_t mode,
994              ldlm_policy_data_t *policy,
995              struct ldlm_res_id *res_id)
996 {
997         int flags = 0; /*XXX: LDLM_FL_LOCAL_ONLY?*/
998         int rc;
999
1000         LASSERT(ns != NULL);
1001         LASSERT(lh != NULL);
1002         LASSERT(f != NULL);
1003
1004         rc = ldlm_cli_enqueue_local(ns, *fid_build_res_name(f, res_id),
1005                                     LDLM_IBITS, policy, mode, &flags,
1006                                     ldlm_blocking_ast, ldlm_completion_ast,
1007                                     NULL, NULL, 0, NULL, lh);
1008         return rc == ELDLM_OK ? 0 : -EIO;
1009 }
1010
1011 /* just call ldlm_lock_decref() if decref,
1012  * else we only call ptlrpc_save_lock() to save this lock in req.
1013  * when transaction committed, req will be released, and lock will, too */
1014 void fid_unlock(struct ptlrpc_request *req, const struct lu_fid *f,
1015                 struct lustre_handle *lh, ldlm_mode_t mode, int decref)
1016 {
1017         {
1018         /* FIXME: this is debug stuff, remove it later. */
1019                 struct ldlm_lock *lock = ldlm_handle2lock(lh);
1020                 if (!lock) {
1021                         CERROR("invalid lock handle "LPX64, lh->cookie);
1022                         LBUG();
1023                 }
1024                 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
1025                 LDLM_LOCK_PUT(lock);
1026         }
1027         if (decref)
1028                 ldlm_lock_decref(lh, mode);
1029         else
1030                 ptlrpc_save_lock(req, lh, mode);
1031 }
1032
1033 static struct mdt_object *mdt_obj(struct lu_object *o)
1034 {
1035         LASSERT(lu_device_is_mdt(o->lo_dev));
1036         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
1037 }
1038
1039 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
1040                                    struct mdt_device *d,
1041                                    const struct lu_fid *f)
1042 {
1043         struct lu_object *o;
1044         struct mdt_object *m;
1045         ENTRY;
1046
1047         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
1048         if (IS_ERR(o))
1049                 m = (struct mdt_object *)o;
1050         else
1051                 m = mdt_obj(o);
1052         RETURN(m);
1053 }
1054
1055 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
1056                     struct mdt_lock_handle *lh, __u64 ibits)
1057 {
1058         ldlm_policy_data_t *policy = &info->mti_policy;
1059         struct ldlm_res_id *res_id = &info->mti_res_id;
1060         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1061         int rc;
1062         ENTRY;
1063
1064         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1065         LASSERT(lh->mlh_mode != LCK_MINMODE);
1066
1067         policy->l_inodebits.bits = ibits;
1068
1069         rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode,
1070                       policy, res_id);
1071         RETURN(rc);
1072 }
1073
1074 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
1075                        struct mdt_lock_handle *lh, int decref)
1076 {
1077         struct ptlrpc_request *req = mdt_info_req(info);
1078         ENTRY;
1079
1080         if (lustre_handle_is_used(&lh->mlh_lh)) {
1081                 fid_unlock(req, mdt_object_fid(o),
1082                            &lh->mlh_lh, lh->mlh_mode, decref);
1083                 lh->mlh_lh.cookie = 0;
1084         }
1085         EXIT;
1086 }
1087
1088 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
1089                                         const struct lu_fid *f,
1090                                         struct mdt_lock_handle *lh,
1091                                         __u64 ibits)
1092 {
1093         struct mdt_object *o;
1094
1095         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
1096         if (!IS_ERR(o)) {
1097                 int rc;
1098
1099                 rc = mdt_object_lock(info, o, lh, ibits);
1100                 if (rc != 0) {
1101                         mdt_object_put(info->mti_ctxt, o);
1102                         o = ERR_PTR(rc);
1103                 }
1104         }
1105         return o;
1106 }
1107
1108 void mdt_object_unlock_put(struct mdt_thread_info * info,
1109                            struct mdt_object * o,
1110                            struct mdt_lock_handle *lh,
1111                            int decref)
1112 {
1113         mdt_object_unlock(info, o, lh, decref);
1114         mdt_object_put(info->mti_ctxt, o);
1115 }
1116
1117 static struct mdt_handler *mdt_handler_find(__u32 opc,
1118                                             struct mdt_opc_slice *supported)
1119 {
1120         struct mdt_opc_slice *s;
1121         struct mdt_handler   *h;
1122
1123         h = NULL;
1124         for (s = supported; s->mos_hs != NULL; s++) {
1125                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1126                         h = s->mos_hs + (opc - s->mos_opc_start);
1127                         if (h->mh_opc != 0)
1128                                 LASSERT(h->mh_opc == opc);
1129                         else
1130                                 h = NULL; /* unsupported opc */
1131                         break;
1132                 }
1133         }
1134         return h;
1135 }
1136
1137 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
1138 {
1139         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
1140 }
1141
1142 static inline __u64 req_exp_last_close_xid(struct ptlrpc_request *req)
1143 {
1144         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_close_xid);
1145 }
1146
1147 static int mdt_lock_resname_compat(struct mdt_device *m,
1148                                    struct ldlm_request *req)
1149 {
1150         /* XXX something... later. */
1151         return 0;
1152 }
1153
1154 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1155 {
1156         /* XXX something... later. */
1157         return 0;
1158 }
1159
1160 /*
1161  * Generic code handling requests that have struct mdt_body passed in:
1162  *
1163  *  - extract mdt_body from request and save it in @info, if present;
1164  *
1165  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
1166  *  @info;
1167  *
1168  *  - if HABEO_CORPUS flag is set for this request type check whether object
1169  *  actually exists on storage (lu_object_exists()).
1170  *
1171  */
1172 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1173 {
1174         const struct mdt_body   *body;
1175         struct mdt_object       *obj;
1176         const struct lu_context *ctx;
1177         struct req_capsule      *pill;
1178         int                     rc;
1179
1180         ctx = info->mti_ctxt;
1181         pill = &info->mti_pill;
1182
1183         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1184         if (body != NULL) {
1185                 if (fid_is_sane(&body->fid1)) {
1186                         obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
1187                         if (!IS_ERR(obj)) {
1188                                 if ((flags & HABEO_CORPUS) &&
1189                                     !lu_object_exists(&obj->mot_obj.mo_lu)) {
1190                                         mdt_object_put(ctx, obj);
1191                                         rc = -ENOENT;
1192                                 } else {
1193                                         info->mti_object = obj;
1194                                         rc = 0;
1195                                 }
1196                         } else
1197                                 rc = PTR_ERR(obj);
1198                 } else {
1199                         CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1200                         rc = -EINVAL;
1201                 }
1202         } else
1203                 rc = -EFAULT;
1204         return rc;
1205 }
1206
1207 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1208 {
1209         struct req_capsule *pill;
1210         int rc;
1211
1212         ENTRY;
1213         pill = &info->mti_pill;
1214
1215         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1216                 rc = mdt_body_unpack(info, flags);
1217         else
1218                 rc = 0;
1219
1220         if (rc == 0 && (flags & HABEO_REFERO)) {
1221                 struct mdt_device       *mdt = info->mti_mdt;
1222                 /*pack reply*/
1223                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1224                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1225                                              mdt->mdt_max_mdsize);
1226                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1227                         req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1228                                              mdt->mdt_max_cookiesize);
1229
1230                 rc = req_capsule_pack(pill);
1231         }
1232         RETURN(rc);
1233 }
1234
1235 struct lu_context_key mdt_txn_key;
1236
1237 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
1238 {
1239         struct mdt_device     *mdt = info->mti_mdt;
1240         struct ptlrpc_request *req = mdt_info_req(info);
1241         struct obd_export     *exp = req->rq_export;
1242
1243         /* sometimes the reply message has not been successfully packed */
1244         if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1245                 return;
1246
1247         if (info->mti_trans_flags & MDT_NONEED_TRANSNO)
1248                 return;
1249
1250         /*XXX: assert on this when all code will be finished */
1251         if (rc != 0 && info->mti_transno != 0) {
1252                 info->mti_transno = 0;
1253                 CERROR("Transno is not 0 while rc is %i!\n", rc);
1254         }
1255
1256         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1257                info->mti_transno, exp->exp_obd->obd_last_committed);
1258
1259         spin_lock(&mdt->mdt_transno_lock);
1260         req->rq_transno = info->mti_transno;
1261         lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
1262
1263         target_committed_to_req(req);
1264
1265         spin_unlock(&mdt->mdt_transno_lock);
1266         lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
1267         //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
1268 }
1269
1270 /*
1271  * Invoke handler for this request opc. Also do necessary preprocessing
1272  * (according to handler ->mh_flags), and post-processing (setting of
1273  * ->last_{xid,committed}).
1274  */
1275 static int mdt_req_handle(struct mdt_thread_info *info,
1276                           struct mdt_handler *h, struct ptlrpc_request *req)
1277 {
1278         int   rc;
1279         __u32 flags;
1280
1281         ENTRY;
1282
1283         LASSERT(h->mh_act != NULL);
1284         LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1285         LASSERT(current->journal_info == NULL);
1286
1287         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1288
1289         if (h->mh_fail_id != 0)
1290                 MDT_FAIL_RETURN(h->mh_fail_id, 0);
1291
1292         rc = 0;
1293         flags = h->mh_flags;
1294         LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1295
1296         if (h->mh_fmt != NULL) {
1297                 req_capsule_set(&info->mti_pill, h->mh_fmt);
1298                 rc = mdt_unpack_req_pack_rep(info, flags);
1299         }
1300
1301         if (rc == 0 && flags & MUTABOR &&
1302             req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1303                 rc = -EROFS;
1304
1305         if (rc == 0 && flags & HABEO_CLAVIS) {
1306                 struct ldlm_request *dlm_req;
1307
1308                 LASSERT(h->mh_fmt != NULL);
1309
1310                 dlm_req = req_capsule_client_get(&info->mti_pill,&RMF_DLM_REQ);
1311                 if (dlm_req != NULL) {
1312                         if (info->mti_mdt->mdt_opts.mo_compat_resname)
1313                                 rc = mdt_lock_resname_compat(info->mti_mdt,
1314                                                                  dlm_req);
1315                         info->mti_dlm_req = dlm_req;
1316                 } else {
1317                         CERROR("Can't unpack dlm request\n");
1318                         rc = -EFAULT;
1319                 }
1320         }
1321
1322         if (rc == 0)
1323                 /*
1324                  * Process request.
1325                  */
1326                 rc = h->mh_act(info);
1327         /*
1328          * XXX result value is unconditionally shoved into ->rq_status
1329          * (original code sometimes placed error code into ->rq_status, and
1330          * sometimes returned it to the
1331          * caller). ptlrpc_server_handle_request() doesn't check return value
1332          * anyway.
1333          */
1334         req->rq_status = rc;
1335         rc = 0;
1336         LASSERT(current->journal_info == NULL);
1337
1338         if (flags & HABEO_CLAVIS && info->mti_mdt->mdt_opts.mo_compat_resname) {
1339                 struct ldlm_reply *dlmrep;
1340
1341                 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1342                 if (dlmrep != NULL)
1343                         rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1344         }
1345
1346         /* If we're DISCONNECTing, the mdt_export_data is already freed */
1347
1348 #if 0
1349         if (h->mh_opc != MDS_DISCONNECT &&
1350             h->mh_opc != MDS_READPAGE &&
1351             h->mh_opc != LDLM_ENQUEUE) {
1352                 mdt_finish_reply(info, req->rq_status);
1353         }
1354 #endif
1355         RETURN(rc);
1356 }
1357
1358 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1359 {
1360         lh->mlh_lh.cookie = 0ull;
1361         lh->mlh_mode = LCK_MINMODE;
1362 }
1363
1364 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1365 {
1366         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1367 }
1368
1369 static void mdt_thread_info_init(struct ptlrpc_request *req,
1370                                  struct mdt_thread_info *info)
1371 {
1372         int i;
1373
1374         memset(info, 0, sizeof(*info));
1375
1376         info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1377         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1378                 info->mti_rep_buf_size[i] = -1;
1379
1380         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1381                 mdt_lock_handle_init(&info->mti_lh[i]);
1382
1383         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1384         info->mti_ctxt = req->rq_svc_thread->t_ctx;
1385         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1386         /* it can be NULL while CONNECT */
1387         if (req->rq_export)
1388                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1389         req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1390                          info->mti_rep_buf_size);
1391 }
1392
1393 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1394 {
1395         int i;
1396
1397         req_capsule_fini(&info->mti_pill);
1398         if (info->mti_object != NULL) {
1399                 mdt_object_put(info->mti_ctxt, info->mti_object);
1400                 info->mti_object = NULL;
1401         }
1402         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1403                 mdt_lock_handle_fini(&info->mti_lh[i]);
1404 }
1405
1406 /* mds/handler.c */
1407 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1408                                        struct obd_device *obd, int *process);
1409 /*
1410  * Handle recovery. Return:
1411  *        +1: continue request processing;
1412  *       -ve: abort immediately with the given error code;
1413  *         0: send reply with error code in req->rq_status;
1414  */
1415 static int mdt_recovery(struct ptlrpc_request *req)
1416 {
1417         int recovering;
1418         int abort_recovery;
1419         struct obd_device *obd;
1420
1421         ENTRY;
1422
1423         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CONNECT)
1424                 RETURN(+1);
1425
1426         if (req->rq_export == NULL) {
1427                 CERROR("operation %d on unconnected MDS from %s\n",
1428                        lustre_msg_get_opc(req->rq_reqmsg),
1429                        libcfs_id2str(req->rq_peer));
1430                 req->rq_status = -ENOTCONN;
1431                 RETURN(-ENOTCONN);
1432         }
1433
1434         /* sanity check: if the xid matches, the request must be marked as a
1435          * resent or replayed */
1436         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req) ||
1437                       req->rq_xid == req_exp_last_close_xid(req),
1438                       lustre_msg_get_flags(req->rq_reqmsg) &
1439                       (MSG_RESENT | MSG_REPLAY)),
1440                  "rq_xid "LPU64" matches last_xid, "
1441                  "expected RESENT flag\n", req->rq_xid);
1442
1443         /* else: note the opposite is not always true; a RESENT req after a
1444          * failover will usually not match the last_xid, since it was likely
1445          * never committed. A REPLAYed request will almost never match the
1446          * last xid, however it could for a committed, but still retained,
1447          * open. */
1448
1449         obd = req->rq_export->exp_obd;
1450
1451         /* Check for aborted recovery... */
1452         spin_lock_bh(&obd->obd_processing_task_lock);
1453         abort_recovery = obd->obd_abort_recovery;
1454         recovering = obd->obd_recovering;
1455         spin_unlock_bh(&obd->obd_processing_task_lock);
1456         if (abort_recovery) {
1457                 target_abort_recovery(obd);
1458         } else if (recovering) {
1459                 int rc;
1460                 int should_process;
1461
1462                 rc = mds_filter_recovery_request(req, obd, &should_process);
1463                 if (rc != 0 || !should_process) {
1464                         RETURN(rc);
1465                 }
1466         }
1467         RETURN(+1);
1468 }
1469
1470 static int mdt_reply(struct ptlrpc_request *req, int rc,
1471                      struct mdt_thread_info *info)
1472 {
1473         struct obd_device *obd;
1474         ENTRY;
1475
1476         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1477                 if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1478                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1479
1480                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1481                 if (obd && obd->obd_recovering) {
1482                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1483                         RETURN(target_queue_final_reply(req, rc));
1484                 } else {
1485                         /* Lost a race with recovery; let the error path
1486                          * DTRT. */
1487                         rc = req->rq_status = -ENOTCONN;
1488                 }
1489         }
1490         target_send_reply(req, rc, info->mti_fail_id);
1491         RETURN(0);
1492 }
1493
1494 /* mds/handler.c */
1495 extern int mds_msg_check_version(struct lustre_msg *msg);
1496
1497 static int mdt_handle0(struct ptlrpc_request *req,
1498                        struct mdt_thread_info *info,
1499                        struct mdt_opc_slice *supported)
1500 {
1501         struct mdt_handler *h;
1502         struct lustre_msg  *msg;
1503         int                 rc;
1504
1505         ENTRY;
1506
1507         MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1508
1509         LASSERT(current->journal_info == NULL);
1510
1511         msg = req->rq_reqmsg;
1512         rc = mds_msg_check_version(msg);
1513         if (rc == 0) {
1514                 rc = mdt_recovery(req);
1515                 switch (rc) {
1516                 case +1:
1517                         h = mdt_handler_find(lustre_msg_get_opc(msg),
1518                                              supported);
1519                         if (h != NULL)
1520                                 rc = mdt_req_handle(info, h, req);
1521                         else {
1522                                 req->rq_status = -ENOTSUPP;
1523                                 rc = ptlrpc_error(req);
1524                                 break;
1525                         }
1526                         /* fall through */
1527                 case 0:
1528                         rc = mdt_reply(req, rc, info);
1529                 }
1530         } else
1531                 CERROR(LUSTRE_MDT_NAME" drops mal-formed request\n");
1532         RETURN(rc);
1533 }
1534
1535 /*
1536  * MDT handler function called by ptlrpc service thread when request comes.
1537  *
1538  * XXX common "target" functionality should be factored into separate module
1539  * shared by mdt, ost and stand-alone services like fld.
1540  */
1541 static int mdt_handle_common(struct ptlrpc_request *req,
1542                              struct mdt_opc_slice *supported)
1543 {
1544         struct lu_context      *ctx;
1545         struct mdt_thread_info *info;
1546         int                     rc;
1547         ENTRY;
1548
1549         ctx = req->rq_svc_thread->t_ctx;
1550         LASSERT(ctx != NULL);
1551         LASSERT(ctx->lc_thread == req->rq_svc_thread);
1552         info = lu_context_key_get(ctx, &mdt_thread_key);
1553         LASSERT(info != NULL);
1554
1555         mdt_thread_info_init(req, info);
1556
1557         rc = mdt_handle0(req, info, supported);
1558
1559         mdt_thread_info_fini(info);
1560         RETURN(rc);
1561 }
1562
1563 static int mdt_regular_handle(struct ptlrpc_request *req)
1564 {
1565         return mdt_handle_common(req, mdt_regular_handlers);
1566 }
1567
1568 static int mdt_readpage_handle(struct ptlrpc_request *req)
1569 {
1570         return mdt_handle_common(req, mdt_readpage_handlers);
1571 }
1572
1573 static int mdt_mdsc_handle(struct ptlrpc_request *req)
1574 {
1575         return mdt_handle_common(req, mdt_seq_handlers);
1576 }
1577
1578 static int mdt_mdss_handle(struct ptlrpc_request *req)
1579 {
1580         return mdt_handle_common(req, mdt_seq_handlers);
1581 }
1582
1583 static int mdt_dtss_handle(struct ptlrpc_request *req)
1584 {
1585         return mdt_handle_common(req, mdt_seq_handlers);
1586 }
1587
1588 static int mdt_fld_handle(struct ptlrpc_request *req)
1589 {
1590         return mdt_handle_common(req, mdt_fld_handlers);
1591 }
1592
1593 enum mdt_it_code {
1594         MDT_IT_OPEN,
1595         MDT_IT_OCREAT,
1596         MDT_IT_CREATE,
1597         MDT_IT_GETATTR,
1598         MDT_IT_READDIR,
1599         MDT_IT_LOOKUP,
1600         MDT_IT_UNLINK,
1601         MDT_IT_TRUNC,
1602         MDT_IT_GETXATTR,
1603         MDT_IT_NR
1604 };
1605
1606 static int mdt_intent_getattr(enum mdt_it_code opcode,
1607                               struct mdt_thread_info *info,
1608                               struct ldlm_lock **,
1609                               int);
1610 static int mdt_intent_reint(enum mdt_it_code opcode,
1611                             struct mdt_thread_info *info,
1612                             struct ldlm_lock **,
1613                             int);
1614
1615 static struct mdt_it_flavor {
1616         const struct req_format *it_fmt;
1617         __u32                    it_flags;
1618         int                    (*it_act)(enum mdt_it_code ,
1619                                          struct mdt_thread_info *,
1620                                          struct ldlm_lock **,
1621                                          int);
1622         long                     it_reint;
1623 } mdt_it_flavor[] = {
1624         [MDT_IT_OPEN]     = {
1625                 .it_fmt   = &RQF_LDLM_INTENT,
1626                 /*.it_flags = HABEO_REFERO,*/
1627                 .it_flags = 0,
1628                 .it_act   = mdt_intent_reint,
1629                 .it_reint = REINT_OPEN
1630         },
1631         [MDT_IT_OCREAT]   = {
1632                 .it_fmt   = &RQF_LDLM_INTENT,
1633                 .it_flags = MUTABOR,
1634                 .it_act   = mdt_intent_reint,
1635                 .it_reint = REINT_OPEN
1636         },
1637         [MDT_IT_CREATE]   = {
1638                 .it_fmt   = &RQF_LDLM_INTENT,
1639                 .it_flags = MUTABOR,
1640                 .it_act   = mdt_intent_reint,
1641                 .it_reint = REINT_CREATE
1642         },
1643         [MDT_IT_GETATTR]  = {
1644                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1645                 .it_flags = HABEO_REFERO,
1646                 .it_act   = mdt_intent_getattr
1647         },
1648         [MDT_IT_READDIR]  = {
1649                 .it_fmt   = NULL,
1650                 .it_flags = 0,
1651                 .it_act   = NULL
1652         },
1653         [MDT_IT_LOOKUP]   = {
1654                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1655                 .it_flags = HABEO_REFERO,
1656                 .it_act   = mdt_intent_getattr
1657         },
1658         [MDT_IT_UNLINK]   = {
1659                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
1660                 .it_flags = MUTABOR,
1661                 .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
1662                 .it_reint = REINT_UNLINK
1663         },
1664         [MDT_IT_TRUNC]    = {
1665                 .it_fmt   = NULL,
1666                 .it_flags = MUTABOR,
1667                 .it_act   = NULL
1668         },
1669         [MDT_IT_GETXATTR] = {
1670                 .it_fmt   = NULL,
1671                 .it_flags = 0,
1672                 .it_act   = NULL
1673         }
1674 };
1675
1676 static int mdt_intent_getattr(enum mdt_it_code opcode,
1677                               struct mdt_thread_info *info,
1678                               struct ldlm_lock **lockp,
1679                               int flags)
1680 {
1681         struct ldlm_lock       *old_lock = *lockp;
1682         struct ldlm_lock       *new_lock = NULL;
1683         struct ptlrpc_request  *req = mdt_info_req(info);
1684         struct ldlm_reply      *ldlm_rep;
1685         struct mdt_lock_handle  tmp_lock;
1686         struct mdt_lock_handle *lhc = &tmp_lock;
1687         __u64                   child_bits;
1688
1689         ENTRY;
1690
1691         switch (opcode) {
1692         case MDT_IT_LOOKUP:
1693                 child_bits = MDS_INODELOCK_LOOKUP;
1694                 break;
1695         case MDT_IT_GETATTR:
1696                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1697                 break;
1698         default:
1699                 CERROR("Unhandled till now");
1700                 RETURN(-EINVAL);
1701         }
1702
1703         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1704         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
1705
1706         ldlm_rep->lock_policy_res2 =
1707                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
1708         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
1709
1710         if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
1711                 ldlm_rep->lock_policy_res2 = 0;
1712         if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
1713                     ldlm_rep->lock_policy_res2) {
1714                 RETURN(ELDLM_LOCK_ABORTED);
1715         }
1716
1717         new_lock = ldlm_handle2lock(&lhc->mlh_lh);
1718         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1719                 RETURN(0);
1720
1721         LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
1722                  opcode, lhc->mlh_lh.cookie);
1723
1724         *lockp = new_lock;
1725
1726         /* FIXME:This only happens when MDT can handle RESENT */
1727         if (new_lock->l_export == req->rq_export) {
1728                 /* Already gave this to the client, which means that we
1729                  * reconstructed a reply. */
1730                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1731                         MSG_RESENT);
1732                 RETURN(ELDLM_LOCK_REPLACED);
1733         }
1734
1735         /* TODO:
1736          * These are copied from mds/hander.c, and should be factored into
1737          * ldlm module in order to share these code, and be easy for merge.
1738          */
1739
1740         /* Fixup the lock to be given to the client */
1741         lock_res_and_lock(new_lock);
1742         new_lock->l_readers = 0;
1743         new_lock->l_writers = 0;
1744
1745         new_lock->l_export = class_export_get(req->rq_export);
1746         list_add(&new_lock->l_export_chain,
1747                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
1748
1749         new_lock->l_blocking_ast = old_lock->l_blocking_ast;
1750         new_lock->l_completion_ast = old_lock->l_completion_ast;
1751
1752         new_lock->l_remote_handle = old_lock->l_remote_handle;
1753
1754         new_lock->l_flags &= ~LDLM_FL_LOCAL;
1755
1756         unlock_res_and_lock(new_lock);
1757         LDLM_LOCK_PUT(new_lock);
1758
1759         RETURN(ELDLM_LOCK_REPLACED);
1760 }
1761
1762 static int mdt_intent_reint(enum mdt_it_code opcode,
1763                             struct mdt_thread_info *info,
1764                             struct ldlm_lock **lockp,
1765                             int flags)
1766 {
1767         long opc;
1768         int rc;
1769         struct ldlm_reply *rep;
1770
1771         static const struct req_format *intent_fmts[REINT_MAX] = {
1772                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
1773                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
1774         };
1775
1776         ENTRY;
1777
1778         opc = mdt_reint_opcode(info, intent_fmts);
1779         if (opc < 0)
1780                 RETURN(opc);
1781
1782         if (mdt_it_flavor[opcode].it_reint != opc) {
1783                 CERROR("Reint code %ld doesn't match intent: %d\n",
1784                        opc, opcode);
1785                 RETURN(-EPROTO);
1786         }
1787
1788         rc = mdt_reint_internal(info, opc);
1789
1790         rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1791         if (rep == NULL)
1792                 RETURN(-EFAULT);
1793         rep->lock_policy_res2 = rc;
1794
1795         mdt_set_disposition(info, rep, DISP_IT_EXECD);
1796 #if 0
1797         mdt_finish_reply(info, rc);
1798 #endif
1799         RETURN(ELDLM_LOCK_ABORTED);
1800 }
1801
1802 static int mdt_intent_code(long itcode)
1803 {
1804         int rc;
1805
1806         switch(itcode) {
1807         case IT_OPEN:
1808                 rc = MDT_IT_OPEN;
1809                 break;
1810         case IT_OPEN|IT_CREAT:
1811                 rc = MDT_IT_OCREAT;
1812                 break;
1813         case IT_CREAT:
1814                 rc = MDT_IT_CREATE;
1815                 break;
1816         case IT_READDIR:
1817                 rc = MDT_IT_READDIR;
1818                 break;
1819         case IT_GETATTR:
1820                 rc = MDT_IT_GETATTR;
1821                 break;
1822         case IT_LOOKUP:
1823                 rc = MDT_IT_LOOKUP;
1824                 break;
1825         case IT_UNLINK:
1826                 rc = MDT_IT_UNLINK;
1827                 break;
1828         case IT_TRUNC:
1829                 rc = MDT_IT_TRUNC;
1830                 break;
1831         case IT_GETXATTR:
1832                 rc = MDT_IT_GETXATTR;
1833                 break;
1834         default:
1835                 CERROR("Unknown intent opcode: %ld\n", itcode);
1836                 rc = -EINVAL;
1837                 break;
1838         }
1839         return rc;
1840 }
1841
1842 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
1843                           struct ldlm_lock **lockp, int flags)
1844 {
1845         struct req_capsule   *pill;
1846         struct mdt_it_flavor *flv;
1847         int opc;
1848         int rc;
1849         ENTRY;
1850
1851         opc = mdt_intent_code(itopc);
1852         if (opc < 0)
1853                 RETURN(-EINVAL);
1854
1855         pill = &info->mti_pill;
1856         flv  = &mdt_it_flavor[opc];
1857
1858         if (flv->it_fmt != NULL)
1859                 req_capsule_extend(pill, flv->it_fmt);
1860
1861         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
1862         if (rc == 0) {
1863                 struct ptlrpc_request *req = mdt_info_req(info);
1864                 if (flv->it_flags & MUTABOR &&
1865                     req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1866                         rc = -EROFS;
1867         }
1868         if (rc == 0 && flv->it_act != NULL) {
1869                 /* execute policy */
1870                 rc = flv->it_act(opc, info, lockp, flags);
1871         } else
1872                 rc = -EOPNOTSUPP;
1873         RETURN(rc);
1874 }
1875
1876 static int mdt_intent_policy(struct ldlm_namespace *ns,
1877                              struct ldlm_lock **lockp, void *req_cookie,
1878                              ldlm_mode_t mode, int flags, void *data)
1879 {
1880         struct mdt_thread_info *info;
1881         struct ptlrpc_request  *req  =  req_cookie;
1882         struct ldlm_intent     *it;
1883         struct req_capsule     *pill;
1884         struct ldlm_lock       *lock = *lockp;
1885         int rc;
1886
1887         ENTRY;
1888
1889         LASSERT(req != NULL);
1890
1891         info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1892         LASSERT(info != NULL);
1893         pill = &info->mti_pill;
1894         LASSERT(pill->rc_req == req);
1895
1896         if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
1897                 req_capsule_extend(pill, &RQF_LDLM_INTENT);
1898                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
1899                 if (it != NULL) {
1900                         LDLM_DEBUG(lock, "intent policy opc: %s",
1901                                    ldlm_it2str(it->opc));
1902
1903                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
1904                         if (rc == 0)
1905                                 rc = ELDLM_OK;
1906                 } else
1907                         rc = -EFAULT;
1908         } else {
1909                 /* No intent was provided */
1910                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
1911                 rc = req_capsule_pack(pill);
1912         }
1913         RETURN(rc);
1914 }
1915
1916 /*
1917  * Seq wrappers
1918  */
1919 static int mdt_seq_fini(const struct lu_context *ctx,
1920                         struct mdt_device *m)
1921 {
1922         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1923         ENTRY;
1924
1925         if (ls && ls->ls_server_seq) {
1926                 seq_server_fini(ls->ls_server_seq, ctx);
1927                 OBD_FREE_PTR(ls->ls_server_seq);
1928                 ls->ls_server_seq = NULL;
1929         }
1930         
1931         if (ls && ls->ls_control_seq) {
1932                 seq_server_fini(ls->ls_control_seq, ctx);
1933                 OBD_FREE_PTR(ls->ls_control_seq);
1934                 ls->ls_control_seq = NULL;
1935         }
1936
1937         if (ls && ls->ls_client_seq) {
1938                 seq_client_fini(ls->ls_client_seq);
1939                 OBD_FREE_PTR(ls->ls_client_seq);
1940                 ls->ls_client_seq = NULL;
1941         }
1942
1943         RETURN(0);
1944 }
1945
1946 static int mdt_seq_init(const struct lu_context *ctx,
1947                         const char *uuid,
1948                         struct mdt_device *m)
1949 {
1950         struct lu_site *ls;
1951         char *prefix;
1952         int rc;
1953         ENTRY;
1954
1955         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1956
1957         /*
1958          * This is sequence-controller node. Init seq-controller server on local
1959          * MDT.
1960          */
1961         if (ls->ls_node_id == 0) {
1962                 LASSERT(ls->ls_control_seq == NULL);
1963
1964                 OBD_ALLOC_PTR(ls->ls_control_seq);
1965                 if (ls->ls_control_seq == NULL)
1966                         RETURN(-ENOMEM);
1967
1968                 rc = seq_server_init(ls->ls_control_seq,
1969                                      m->mdt_bottom, uuid,
1970                                      LUSTRE_SEQ_CONTROLLER,
1971                                      ctx);
1972
1973                 if (rc)
1974                         GOTO(out_seq_fini, rc);
1975                 
1976                 OBD_ALLOC_PTR(ls->ls_client_seq);
1977                 if (ls->ls_client_seq == NULL)
1978                         GOTO(out_seq_fini, rc = -ENOMEM);
1979
1980                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
1981                 if (prefix == NULL) {
1982                         OBD_FREE_PTR(ls->ls_client_seq);
1983                         GOTO(out_seq_fini, rc = -ENOMEM);
1984                 }
1985
1986                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
1987                          uuid);
1988
1989                 /*
1990                  * Init seq-controller client after seq-controller server is
1991                  * ready. Pass ls->ls_control_seq to it for direct talking.
1992                  */
1993                 rc = seq_client_init(ls->ls_client_seq, NULL,
1994                                      LUSTRE_SEQ_METADATA, prefix,
1995                                      ls->ls_control_seq, ctx);
1996                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
1997
1998                 if (rc)
1999                         GOTO(out_seq_fini, rc);
2000         }
2001
2002         /* Init seq-server on local MDT */
2003         LASSERT(ls->ls_server_seq == NULL);
2004         
2005         OBD_ALLOC_PTR(ls->ls_server_seq);
2006         if (ls->ls_server_seq == NULL)
2007                 GOTO(out_seq_fini, rc = -ENOMEM);
2008
2009         rc = seq_server_init(ls->ls_server_seq,
2010                              m->mdt_bottom, uuid,
2011                              LUSTRE_SEQ_SERVER,
2012                              ctx);
2013         if (rc)
2014                 GOTO(out_seq_fini, rc = -ENOMEM);
2015
2016         /* Assign seq-controller client to local seq-server. */
2017         if (ls->ls_node_id == 0) {
2018                 LASSERT(ls->ls_client_seq != NULL);
2019                 
2020                 rc = seq_server_set_cli(ls->ls_server_seq,
2021                                         ls->ls_client_seq,
2022                                         ctx);
2023         }
2024         
2025         EXIT;
2026 out_seq_fini:
2027         if (rc)
2028                 mdt_seq_fini(ctx, m);
2029
2030         return rc;
2031 }
2032
2033 /*
2034  * Init client sequence manager which is used by local MDS to talk to sequence
2035  * controller on remote node.
2036  */
2037 static int mdt_seq_init_cli(const struct lu_context *ctx,
2038                             struct mdt_device *m,
2039                             struct lustre_cfg *cfg)
2040 {
2041         struct lu_site    *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2042         struct obd_device *mdc;
2043         struct obd_uuid   *uuidp, *mdcuuidp;
2044         char              *uuid_str, *mdc_uuid_str;
2045         int               rc;
2046         int               index;
2047         struct mdt_thread_info *info;
2048         char *p, *index_string = lustre_cfg_string(cfg, 2);
2049         ENTRY;
2050
2051         info = lu_context_key_get(ctx, &mdt_thread_key);
2052         uuidp = &info->mti_u.uuid[0];
2053         mdcuuidp = &info->mti_u.uuid[1];
2054
2055         LASSERT(index_string);
2056
2057         index = simple_strtol(index_string, &p, 10);
2058         if (*p) {
2059                 CERROR("Invalid index in lustre_cgf, offset 2\n");
2060                 RETURN(-EINVAL);
2061         }
2062
2063         /* check if this is adding the first MDC and controller is not yet
2064          * initialized. */
2065         if (index != 0 || ls->ls_client_seq)
2066                 RETURN(0);
2067
2068         uuid_str = lustre_cfg_string(cfg, 1);
2069         mdc_uuid_str = lustre_cfg_string(cfg, 4);
2070         obd_str2uuid(uuidp, uuid_str);
2071         obd_str2uuid(mdcuuidp, mdc_uuid_str);
2072
2073         mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
2074         if (!mdc) {
2075                 CERROR("can't find controller MDC by uuid %s\n",
2076                        uuid_str);
2077                 rc = -ENOENT;
2078         } else if (!mdc->obd_set_up) {
2079                 CERROR("target %s not set up\n", mdc->obd_name);
2080                 rc = -EINVAL;
2081         } else {
2082                 struct lustre_handle conn = {0, };
2083
2084                 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
2085                        mdc->obd_name, mdc->obd_uuid.uuid);
2086
2087                 rc = obd_connect(ctx, &conn, mdc, &mdc->obd_uuid, NULL);
2088
2089                 if (rc) {
2090                         CERROR("target %s connect error %d\n",
2091                                mdc->obd_name, rc);
2092                 } else {
2093                         ls->ls_client_exp = class_conn2export(&conn);
2094
2095                         OBD_ALLOC_PTR(ls->ls_client_seq);
2096
2097                         if (ls->ls_client_seq != NULL) {
2098                                 char *prefix;
2099
2100                                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2101                                 if (!prefix)
2102                                         RETURN(-ENOMEM);
2103
2104                                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2105                                          mdc->obd_name);
2106
2107                                 rc = seq_client_init(ls->ls_client_seq,
2108                                                      ls->ls_client_exp,
2109                                                      LUSTRE_SEQ_METADATA,
2110                                                      prefix, NULL, NULL);
2111                                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2112                         } else
2113                                 rc = -ENOMEM;
2114
2115                         if (rc)
2116                                 RETURN(rc);
2117
2118                         LASSERT(ls->ls_server_seq != NULL);
2119
2120                         rc = seq_server_set_cli(ls->ls_server_seq,
2121                                                 ls->ls_client_seq,
2122                                                 ctx);
2123                 }
2124         }
2125
2126         RETURN(rc);
2127 }
2128
2129 static void mdt_seq_fini_cli(struct mdt_device *m)
2130 {
2131         struct lu_site *ls;
2132         int rc;
2133
2134         ENTRY;
2135
2136         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2137
2138         if (ls && ls->ls_server_seq)
2139                 seq_server_set_cli(ls->ls_server_seq,
2140                                    NULL, NULL);
2141
2142         if (ls && ls->ls_client_exp) {
2143                 rc = obd_disconnect(ls->ls_client_exp);
2144                 if (rc) {
2145                         CERROR("failure to disconnect "
2146                                "obd: %d\n", rc);
2147                 }
2148                 ls->ls_client_exp = NULL;
2149         }
2150         EXIT;
2151 }
2152
2153 /*
2154  * FLD wrappers
2155  */
2156 static int mdt_fld_fini(const struct lu_context *ctx,
2157                         struct mdt_device *m)
2158 {
2159         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2160         ENTRY;
2161
2162         if (ls && ls->ls_server_fld) {
2163                 fld_server_fini(ls->ls_server_fld, ctx);
2164                 OBD_FREE_PTR(ls->ls_server_fld);
2165                 ls->ls_server_fld = NULL;
2166         }
2167
2168         if (ls && ls->ls_client_fld != NULL) {
2169                 fld_client_fini(ls->ls_client_fld);
2170                 OBD_FREE_PTR(ls->ls_client_fld);
2171                 ls->ls_client_fld = NULL;
2172         }
2173
2174         RETURN(0);
2175 }
2176
2177 static int mdt_fld_init(const struct lu_context *ctx,
2178                         const char *uuid,
2179                         struct mdt_device *m)
2180 {
2181         struct lu_fld_target target;
2182         struct lu_site *ls;
2183         int rc;
2184         ENTRY;
2185
2186         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2187
2188         OBD_ALLOC_PTR(ls->ls_server_fld);
2189         if (ls->ls_server_fld == NULL)
2190                 RETURN(rc = -ENOMEM);
2191
2192         rc = fld_server_init(ls->ls_server_fld, ctx,
2193                              m->mdt_bottom, uuid);
2194         if (rc) {
2195                 OBD_FREE_PTR(ls->ls_server_fld);
2196                 ls->ls_server_fld = NULL;
2197         }
2198
2199         OBD_ALLOC_PTR(ls->ls_client_fld);
2200         if (!ls->ls_client_fld)
2201                 GOTO(out_fld_fini, rc = -ENOMEM);
2202
2203         rc = fld_client_init(ls->ls_client_fld, uuid,
2204                              LUSTRE_CLI_FLD_HASH_DHT);
2205         if (rc) {
2206                 CERROR("can't init FLD, err %d\n",  rc);        
2207                 OBD_FREE_PTR(ls->ls_client_fld);
2208                 GOTO(out_fld_fini, rc);
2209         }
2210
2211         target.ft_srv = ls->ls_server_fld;
2212         target.ft_idx = ls->ls_node_id;
2213         target.ft_exp = NULL;
2214         
2215         fld_client_add_target(ls->ls_client_fld, &target);
2216         EXIT;
2217 out_fld_fini:
2218         if (rc)
2219                 mdt_fld_fini(ctx, m);
2220         return rc;
2221 }
2222
2223 /* device init/fini methods */
2224 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2225 {
2226         if (m->mdt_regular_service != NULL) {
2227                 ptlrpc_unregister_service(m->mdt_regular_service);
2228                 m->mdt_regular_service = NULL;
2229         }
2230         if (m->mdt_readpage_service != NULL) {
2231                 ptlrpc_unregister_service(m->mdt_readpage_service);
2232                 m->mdt_readpage_service = NULL;
2233         }
2234         if (m->mdt_setattr_service != NULL) {
2235                 ptlrpc_unregister_service(m->mdt_setattr_service);
2236                 m->mdt_setattr_service = NULL;
2237         }
2238         if (m->mdt_mdsc_service != NULL) {
2239                 ptlrpc_unregister_service(m->mdt_mdsc_service);
2240                 m->mdt_mdsc_service = NULL;
2241         }
2242         if (m->mdt_mdss_service != NULL) {
2243                 ptlrpc_unregister_service(m->mdt_mdss_service);
2244                 m->mdt_mdss_service = NULL;
2245         }
2246         if (m->mdt_dtss_service != NULL) {
2247                 ptlrpc_unregister_service(m->mdt_dtss_service);
2248                 m->mdt_dtss_service = NULL;
2249         }
2250         if (m->mdt_fld_service != NULL) {
2251                 ptlrpc_unregister_service(m->mdt_fld_service);
2252                 m->mdt_fld_service = NULL;
2253         }
2254 }
2255
2256 static int mdt_start_ptlrpc_service(struct mdt_device *m)
2257 {
2258         int rc;
2259         static struct ptlrpc_service_conf conf;
2260         ENTRY;
2261
2262         conf = (typeof(conf)) {
2263                 .psc_nbufs            = MDS_NBUFS,
2264                 .psc_bufsize          = MDS_BUFSIZE,
2265                 .psc_max_req_size     = MDS_MAXREQSIZE,
2266                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2267                 .psc_req_portal       = MDS_REQUEST_PORTAL,
2268                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2269                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2270                 /*
2271                  * We'd like to have a mechanism to set this on a per-device
2272                  * basis, but alas...
2273                  */
2274                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2275                                        MDT_MAX_THREADS),
2276                 .psc_ctx_tags      = LCT_MD_THREAD
2277         };
2278
2279         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2280         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2281                            "mdt_ldlm_client", m->mdt_ldlm_client);
2282
2283         m->mdt_regular_service =
2284                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
2285                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2286                                      NULL);
2287         if (m->mdt_regular_service == NULL)
2288                 RETURN(-ENOMEM);
2289
2290         rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
2291         if (rc)
2292                 GOTO(err_mdt_svc, rc);
2293
2294         /*
2295          * readpage service configuration. Parameters have to be adjusted,
2296          * ideally.
2297          */
2298         conf = (typeof(conf)) {
2299                 .psc_nbufs            = MDS_NBUFS,
2300                 .psc_bufsize          = MDS_BUFSIZE,
2301                 .psc_max_req_size     = MDS_MAXREQSIZE,
2302                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2303                 .psc_req_portal       = MDS_READPAGE_PORTAL,
2304                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2305                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2306                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2307                                        MDT_MAX_THREADS),
2308                 .psc_ctx_tags      = LCT_MD_THREAD
2309         };
2310         m->mdt_readpage_service =
2311                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2312                                      LUSTRE_MDT_NAME "_readpage",
2313                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2314                                      NULL);
2315
2316         if (m->mdt_readpage_service == NULL) {
2317                 CERROR("failed to start readpage service\n");
2318                 GOTO(err_mdt_svc, rc = -ENOMEM);
2319         }
2320
2321         rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2322
2323         /*
2324          * setattr service configuration.
2325          */
2326         conf = (typeof(conf)) {
2327                 .psc_nbufs            = MDS_NBUFS,
2328                 .psc_bufsize          = MDS_BUFSIZE,
2329                 .psc_max_req_size     = MDS_MAXREQSIZE,
2330                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2331                 .psc_req_portal       = MDS_SETATTR_PORTAL,
2332                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2333                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2334                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2335                                        MDT_MAX_THREADS),
2336                 .psc_ctx_tags      = LCT_MD_THREAD
2337         };
2338
2339         m->mdt_setattr_service =
2340                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
2341                                      LUSTRE_MDT_NAME "_setattr",
2342                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2343                                      NULL);
2344
2345         if (!m->mdt_setattr_service) {
2346                 CERROR("failed to start setattr service\n");
2347                 GOTO(err_mdt_svc, rc = -ENOMEM);
2348         }
2349
2350         rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2351         if (rc)
2352                 GOTO(err_mdt_svc, rc);
2353
2354         /*
2355          * sequence controller service configuration
2356          */
2357         conf = (typeof(conf)) {
2358                 .psc_nbufs = MDS_NBUFS,
2359                 .psc_bufsize = MDS_BUFSIZE,
2360                 .psc_max_req_size = SEQ_MAXREQSIZE,
2361                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2362                 .psc_req_portal = SEQ_CONTROLLER_PORTAL,
2363                 .psc_rep_portal = MDC_REPLY_PORTAL,
2364                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2365                 .psc_num_threads = SEQ_NUM_THREADS,
2366                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2367         };
2368
2369         m->mdt_mdsc_service =
2370                 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
2371                                      LUSTRE_MDT_NAME"_mdsc",
2372                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2373                                      NULL);
2374         if (!m->mdt_mdsc_service) {
2375                 CERROR("failed to start seq controller service\n");
2376                 GOTO(err_mdt_svc, rc = -ENOMEM);
2377         }
2378
2379         rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
2380         if (rc)
2381                 GOTO(err_mdt_svc, rc);
2382
2383         /*
2384          * metadata sequence server service configuration
2385          */
2386         conf = (typeof(conf)) {
2387                 .psc_nbufs = MDS_NBUFS,
2388                 .psc_bufsize = MDS_BUFSIZE,
2389                 .psc_max_req_size = SEQ_MAXREQSIZE,
2390                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2391                 .psc_req_portal = SEQ_METADATA_PORTAL,
2392                 .psc_rep_portal = MDC_REPLY_PORTAL,
2393                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2394                 .psc_num_threads = SEQ_NUM_THREADS,
2395                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2396         };
2397
2398         m->mdt_mdss_service =
2399                 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
2400                                      LUSTRE_MDT_NAME"_mdss",
2401                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2402                                      NULL);
2403         if (!m->mdt_mdss_service) {
2404                 CERROR("failed to start metadata seq server service\n");
2405                 GOTO(err_mdt_svc, rc = -ENOMEM);
2406         }
2407
2408         rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
2409         if (rc)
2410                 GOTO(err_mdt_svc, rc);
2411
2412
2413         /*
2414          * Data sequence server service configuration. We want to have really
2415          * cluster-wide sequences space. This is why we start only one sequence
2416          * controller which manages space.
2417          */
2418         conf = (typeof(conf)) {
2419                 .psc_nbufs = MDS_NBUFS,
2420                 .psc_bufsize = MDS_BUFSIZE,
2421                 .psc_max_req_size = SEQ_MAXREQSIZE,
2422                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2423                 .psc_req_portal = SEQ_DATA_PORTAL,
2424                 .psc_rep_portal = OSC_REPLY_PORTAL,
2425                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2426                 .psc_num_threads = SEQ_NUM_THREADS,
2427                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2428         };
2429
2430         m->mdt_dtss_service =
2431                 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
2432                                      LUSTRE_MDT_NAME"_dtss",
2433                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2434                                      NULL);
2435         if (!m->mdt_dtss_service) {
2436                 CERROR("failed to start data seq server service\n");
2437                 GOTO(err_mdt_svc, rc = -ENOMEM);
2438         }
2439
2440         rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
2441         if (rc)
2442                 GOTO(err_mdt_svc, rc);
2443
2444         /* FLD service start */
2445         conf = (typeof(conf)) {
2446                 .psc_nbufs            = MDS_NBUFS,
2447                 .psc_bufsize          = MDS_BUFSIZE,
2448                 .psc_max_req_size     = FLD_MAXREQSIZE,
2449                 .psc_max_reply_size   = FLD_MAXREPSIZE,
2450                 .psc_req_portal       = FLD_REQUEST_PORTAL,
2451                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2452                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2453                 .psc_num_threads      = FLD_NUM_THREADS,
2454                 .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
2455         };
2456
2457         m->mdt_fld_service =
2458                 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
2459                                      LUSTRE_MDT_NAME"_fld",
2460                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2461                                      NULL);
2462         if (!m->mdt_fld_service) {
2463                 CERROR("failed to start fld service\n");
2464                 GOTO(err_mdt_svc, rc = -ENOMEM);
2465         }
2466
2467         rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
2468         if (rc)
2469                 GOTO(err_mdt_svc, rc);
2470
2471         EXIT;
2472 err_mdt_svc:
2473         if (rc)
2474                 mdt_stop_ptlrpc_service(m);
2475
2476         return rc;
2477 }
2478
2479 static void mdt_stack_fini(const struct lu_context *ctx,
2480                            struct mdt_device *m, struct lu_device *top)
2481 {
2482         struct lu_device        *d = top, *n;
2483         struct lustre_cfg_bufs  *bufs;
2484         struct lustre_cfg       *lcfg;
2485         struct mdt_thread_info  *info;
2486         ENTRY;
2487
2488         info = lu_context_key_get(ctx, &mdt_thread_key);
2489         LASSERT(info != NULL);
2490
2491         bufs = &info->mti_u.bufs;
2492         /* process cleanup */
2493         lustre_cfg_bufs_reset(bufs, NULL);
2494         lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
2495         if (!lcfg) {
2496                 CERROR("Cannot alloc lcfg!\n");
2497                 return;
2498         }
2499         LASSERT(top);
2500         top->ld_ops->ldo_process_config(ctx, top, lcfg);
2501         lustre_cfg_free(lcfg);
2502
2503         lu_site_purge(ctx, top->ld_site, ~0);
2504         while (d != NULL) {
2505                 struct obd_type *type;
2506                 struct lu_device_type *ldt = d->ld_type;
2507
2508                 /* each fini() returns next device in stack of layers
2509                  * * so we can avoid the recursion */
2510                 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
2511                 lu_device_put(d);
2512                 ldt->ldt_ops->ldto_device_free(ctx, d);
2513                 type = ldt->ldt_obd_type;
2514                 type->typ_refcnt--;
2515                 class_put_type(type);
2516                 
2517                 /* switch to the next device in the layer */
2518                 d = n;
2519         }
2520         m->mdt_child = NULL;
2521 }
2522
2523 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
2524                                          const char *typename,
2525                                          struct lu_device *child,
2526                                          struct lustre_cfg *cfg)
2527 {
2528         struct obd_type       *type;
2529         struct lu_device_type *ldt;
2530         struct lu_device      *d;
2531         int rc;
2532         ENTRY;
2533         
2534         /* find the type */
2535         type = class_get_type(typename);
2536         if (!type) {
2537                 CERROR("Unknown type: '%s'\n", typename);
2538                 GOTO(out, rc = -ENODEV);
2539         }
2540
2541         rc = lu_context_refill(ctx);
2542         if (rc != 0) {
2543                 CERROR("Failure to refill context: '%d'\n", rc);
2544                 GOTO(out_type, rc);
2545         }
2546
2547         ldt = type->typ_lu;
2548         if (ldt == NULL) {
2549                 CERROR("type: '%s'\n", typename);
2550                 GOTO(out_type, rc = -EINVAL);
2551         }
2552
2553         ldt->ldt_obd_type = type;
2554         d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
2555         if (IS_ERR(d)) {
2556                 CERROR("Cannot allocate device: '%s'\n", typename);
2557                 GOTO(out_type, rc = -ENODEV);
2558         }
2559
2560         LASSERT(child->ld_site);
2561         d->ld_site = child->ld_site;
2562
2563         type->typ_refcnt++;
2564         rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
2565         if (rc) {
2566                 CERROR("can't init device '%s', rc %d\n", typename, rc);
2567                 GOTO(out_alloc, rc);
2568         }
2569         lu_device_get(d);
2570
2571         RETURN(d);
2572
2573 out_alloc:
2574         ldt->ldt_ops->ldto_device_free(ctx, d);
2575         type->typ_refcnt--;
2576 out_type:
2577         class_put_type(type);
2578 out:
2579         return ERR_PTR(rc);
2580 }
2581
2582 static int mdt_stack_init(const struct lu_context *ctx, 
2583                           struct mdt_device *m, struct lustre_cfg *cfg)
2584 {
2585         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
2586         struct lu_device  *tmp;
2587         struct md_device  *md;
2588         int rc;
2589         ENTRY;
2590
2591         /* init the stack */
2592         tmp = mdt_layer_setup(ctx, LUSTRE_OSD_NAME, d, cfg);
2593         if (IS_ERR(tmp)) {
2594                 RETURN(PTR_ERR(tmp));
2595         }
2596         m->mdt_bottom = lu2dt_dev(tmp);
2597         d = tmp;
2598         tmp = mdt_layer_setup(ctx, LUSTRE_MDD_NAME, d, cfg);
2599         if (IS_ERR(tmp)) {
2600                 GOTO(out, rc = PTR_ERR(tmp));
2601         }
2602         d = tmp;
2603         md = lu2md_dev(d);
2604
2605         tmp = mdt_layer_setup(ctx, LUSTRE_CMM_NAME, d, cfg);
2606         if (IS_ERR(tmp)) {
2607                 GOTO(out, rc = PTR_ERR(tmp));
2608         }
2609         d = tmp;
2610         /*set mdd upcall device*/
2611         md->md_upcall.mu_upcall_dev = lu2md_dev(d);
2612
2613         md = lu2md_dev(d);
2614         /*set cmm upcall device*/
2615         md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
2616
2617         m->mdt_child = lu2md_dev(d);
2618
2619         /* process setup config */
2620         tmp = &m->mdt_md_dev.md_lu_dev;
2621         rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
2622         GOTO(out, rc);
2623 out:
2624         /* fini from last known good lu_device */
2625         if (rc)
2626                 mdt_stack_fini(ctx, m, d);
2627
2628         return rc;
2629 }
2630
2631 static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
2632 {
2633         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2634         struct lu_site   *ls = d->ld_site;
2635
2636         ENTRY;
2637         target_cleanup_recovery(m->mdt_md_dev.md_lu_dev.ld_obd);
2638         mdt_fs_cleanup(ctx, m);
2639         ping_evictor_stop();
2640         mdt_stop_ptlrpc_service(m);
2641
2642         if (m->mdt_namespace != NULL) {
2643                 ldlm_namespace_free(m->mdt_namespace, 0);
2644                 m->mdt_namespace = NULL;
2645         }
2646
2647         mdt_seq_fini(ctx, m);
2648         mdt_seq_fini_cli(m);
2649
2650         mdt_fld_fini(ctx, m);
2651
2652         /* finish the stack */
2653         mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2654
2655         if (ls) {
2656                 lu_site_fini(ls);
2657                 OBD_FREE_PTR(ls);
2658                 d->ld_site = NULL;
2659         }
2660         LASSERT(atomic_read(&d->ld_ref) == 0);
2661         md_device_fini(&m->mdt_md_dev);
2662
2663         EXIT;
2664 }
2665
2666 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
2667                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
2668 {
2669         struct mdt_thread_info *info;
2670         struct obd_device      *obd;
2671         const char             *dev = lustre_cfg_string(cfg, 0);
2672         const char             *num = lustre_cfg_string(cfg, 2);
2673         struct lu_site         *s;
2674         int                     rc;
2675         ENTRY;
2676
2677         info = lu_context_key_get(ctx, &mdt_thread_key);
2678         LASSERT(info != NULL);
2679
2680         obd = class_name2obd(dev);
2681         LASSERT(obd);
2682
2683         spin_lock_init(&m->mdt_transno_lock);
2684
2685         m->mdt_max_mdsize = MAX_MD_SIZE;
2686         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
2687
2688         spin_lock_init(&m->mdt_epoch_lock);
2689         /* Temporary. should parse mount option. */
2690         m->mdt_opts.mo_user_xattr = 0;
2691         m->mdt_opts.mo_acl = 0;
2692         m->mdt_opts.mo_compat_resname = 0;
2693         obd->obd_replayable = 1;
2694
2695
2696         OBD_ALLOC_PTR(s);
2697         if (s == NULL)
2698                 RETURN(-ENOMEM);
2699
2700         md_device_init(&m->mdt_md_dev, ldt);
2701         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2702         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2703         /* set this lu_device to obd, because error handling need it */
2704         obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
2705
2706         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2707         if (rc) {
2708                 CERROR("can't init lu_site, rc %d\n", rc);
2709                 GOTO(err_free_site, rc);
2710         }
2711
2712         /* init the stack */
2713         rc = mdt_stack_init(ctx, m, cfg);
2714         if (rc) {
2715                 CERROR("can't init device stack, rc %d\n", rc);
2716                 GOTO(err_fini_site, rc);
2717         }
2718
2719         /* set server index */
2720         LASSERT(num);
2721         s->ls_node_id = simple_strtol(num, NULL, 10);
2722
2723         rc = mdt_fld_init(ctx, obd->obd_name, m);
2724         if (rc)
2725                 GOTO(err_fini_stack, rc);
2726
2727         rc = mdt_seq_init(ctx, obd->obd_name, m);
2728         if (rc)
2729                 GOTO(err_fini_fld, rc);
2730
2731         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
2732                  LUSTRE_MDT_NAME"-%p", m);
2733         m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
2734                                               LDLM_NAMESPACE_SERVER);
2735         if (m->mdt_namespace == NULL)
2736                 GOTO(err_fini_seq, rc = -ENOMEM);
2737
2738         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
2739
2740         rc = mdt_start_ptlrpc_service(m);
2741         if (rc)
2742                 GOTO(err_free_ns, rc);
2743
2744         ping_evictor_start();
2745         rc = mdt_fs_setup(ctx, m);
2746         if (rc)
2747                 GOTO(err_stop_service, rc);
2748         RETURN(0);
2749
2750 err_stop_service:
2751         mdt_stop_ptlrpc_service(m);
2752 err_free_ns:
2753         ldlm_namespace_free(m->mdt_namespace, 0);
2754         m->mdt_namespace = NULL;
2755 err_fini_seq:
2756         mdt_seq_fini(ctx, m);
2757 err_fini_fld:
2758         mdt_fld_fini(ctx, m);
2759 err_fini_stack:
2760         mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2761 err_fini_site:
2762         lu_site_fini(s);
2763 err_free_site:
2764         OBD_FREE_PTR(s);
2765
2766         md_device_fini(&m->mdt_md_dev);
2767         return (rc);
2768 }
2769
2770 /* used by MGS to process specific configurations */
2771 static int mdt_process_config(const struct lu_context *ctx,
2772                               struct lu_device *d, struct lustre_cfg *cfg)
2773 {
2774         struct mdt_device *m = mdt_dev(d);
2775         struct md_device *md_next  = m->mdt_child;
2776         struct lu_device *next = md2lu_dev(md_next);
2777         int err;
2778         ENTRY;
2779
2780         switch (cfg->lcfg_command) {
2781         case LCFG_ADD_MDC:
2782                 /*
2783                  * Add mdc hook to get first MDT uuid and connect it to
2784                  * ls->controller to use for seq manager.
2785                  */
2786                 err = mdt_seq_init_cli(ctx, mdt_dev(d), cfg);
2787                 if (err) {
2788                         CERROR("can't initialize controller export, "
2789                                "rc %d\n", err);
2790                 }
2791         default:
2792                 /* others are passed further */
2793                 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
2794                 break;
2795         }
2796         RETURN(err);
2797 }
2798
2799 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
2800                                           const struct lu_object_header *hdr,
2801                                           struct lu_device *d)
2802 {
2803         struct mdt_object *mo;
2804
2805         ENTRY;
2806
2807         OBD_ALLOC_PTR(mo);
2808         if (mo != NULL) {
2809                 struct lu_object *o;
2810                 struct lu_object_header *h;
2811
2812                 o = &mo->mot_obj.mo_lu;
2813                 h = &mo->mot_header;
2814                 lu_object_header_init(h);
2815                 lu_object_init(o, h, d);
2816                 lu_object_add_top(h, o);
2817                 o->lo_ops = &mdt_obj_ops;
2818                 RETURN(o);
2819         } else
2820                 RETURN(NULL);
2821 }
2822
2823 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
2824 {
2825         struct mdt_device *d = mdt_dev(o->lo_dev);
2826         struct lu_device  *under;
2827         struct lu_object  *below;
2828         int                rc = 0;
2829         ENTRY;
2830
2831         CDEBUG(D_INFO, "object init, fid = "DFID"\n",
2832                PFID(lu_object_fid(o)));
2833
2834         under = &d->mdt_child->md_lu_dev;
2835         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
2836         if (below != NULL) {
2837                 lu_object_add(o, below);
2838         } else
2839                 rc = -ENOMEM;
2840         RETURN(rc);
2841 }
2842
2843 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
2844 {
2845         struct mdt_object *mo = mdt_obj(o);
2846         struct lu_object_header *h;
2847         ENTRY;
2848
2849         h = o->lo_header;
2850         CDEBUG(D_INFO, "object free, fid = "DFID"\n",
2851                PFID(lu_object_fid(o)));
2852
2853         lu_object_fini(o);
2854         lu_object_header_fini(h);
2855         OBD_FREE_PTR(mo);
2856         EXIT;
2857 }
2858
2859 static int mdt_object_print(const struct lu_context *ctxt, void *cookie,
2860                             lu_printer_t p, const struct lu_object *o)
2861 {
2862         return (*p)(ctxt, cookie, LUSTRE_MDT_NAME"-object@%p", o);
2863 }
2864
2865 static struct lu_device_operations mdt_lu_ops = {
2866         .ldo_object_alloc   = mdt_object_alloc,
2867         .ldo_process_config = mdt_process_config
2868 };
2869
2870 static struct lu_object_operations mdt_obj_ops = {
2871         .loo_object_init    = mdt_object_init,
2872         .loo_object_free    = mdt_object_free,
2873         .loo_object_print   = mdt_object_print
2874 };
2875
2876 /* mds_connect_internal */
2877 static int mdt_connect_internal(struct obd_export *exp,
2878                                 struct mdt_device *mdt,
2879                                 struct obd_connect_data *data)
2880 {
2881         if (data != NULL) {
2882                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
2883                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
2884
2885                 /* If no known bits (which should not happen, probably,
2886                    as everybody should support LOOKUP and UPDATE bits at least)
2887                    revert to compat mode with plain locks. */
2888                 if (!data->ocd_ibits_known &&
2889                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
2890                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
2891
2892                 if (!mdt->mdt_opts.mo_acl)
2893                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
2894
2895                 if (!mdt->mdt_opts.mo_user_xattr)
2896                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
2897
2898                 exp->exp_connect_flags = data->ocd_connect_flags;
2899                 data->ocd_version = LUSTRE_VERSION_CODE;
2900                 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
2901         }
2902
2903         if (mdt->mdt_opts.mo_acl &&
2904             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
2905                 CWARN("%s: MDS requires ACL support but client does not\n",
2906                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
2907                 return -EBADE;
2908         }
2909         return 0;
2910 }
2911
2912 /* mds_connect copy */
2913 static int mdt_obd_connect(const struct lu_context *ctx,
2914                            struct lustre_handle *conn, struct obd_device *obd,
2915                            struct obd_uuid *cluuid,
2916                            struct obd_connect_data *data)
2917 {
2918         struct mdt_export_data *med;
2919         struct mdt_client_data *mcd;
2920         struct obd_export      *exp;
2921         struct mdt_device      *mdt;
2922         int                     rc;
2923         ENTRY;
2924
2925         LASSERT(ctx != NULL);
2926         if (!conn || !obd || !cluuid)
2927                 RETURN(-EINVAL);
2928
2929         mdt = mdt_dev(obd->obd_lu_dev);
2930
2931         rc = class_connect(conn, obd, cluuid);
2932         if (rc)
2933                 RETURN(rc);
2934
2935         exp = class_conn2export(conn);
2936         LASSERT(exp != NULL);
2937         med = &exp->exp_mdt_data;
2938         
2939         rc = mdt_connect_internal(exp, mdt, data);
2940         if (rc == 0) {
2941                 OBD_ALLOC_PTR(mcd);
2942                 if (mcd != NULL) {
2943                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
2944                         med->med_mcd = mcd;
2945                         rc = mdt_client_add(ctx, mdt, med, -1);
2946                         if (rc != 0)
2947                                 OBD_FREE_PTR(mcd);
2948                 } else
2949                         rc = -ENOMEM;
2950         }
2951
2952         if (rc != 0)
2953                 class_disconnect(exp);
2954         else
2955                 class_export_put(exp);
2956
2957         RETURN(rc);
2958 }
2959
2960 static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
2961                              struct obd_uuid *cluuid,
2962                              struct obd_connect_data *data)
2963 {
2964         int rc;
2965         ENTRY;
2966
2967         if (exp == NULL || obd == NULL || cluuid == NULL)
2968                 RETURN(-EINVAL);
2969
2970         rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
2971
2972         RETURN(rc);
2973 }
2974
2975 static int mdt_obd_disconnect(struct obd_export *exp)
2976 {
2977         int rc;
2978         ENTRY;
2979
2980         LASSERT(exp);
2981         class_export_get(exp);
2982
2983         /* Disconnect early so that clients can't keep using export */
2984         rc = class_disconnect(exp);
2985         //ldlm_cancel_locks_for_export(exp);
2986
2987         /* complete all outstanding replies */
2988         spin_lock(&exp->exp_lock);
2989         while (!list_empty(&exp->exp_outstanding_replies)) {
2990                 struct ptlrpc_reply_state *rs =
2991                         list_entry(exp->exp_outstanding_replies.next,
2992                                    struct ptlrpc_reply_state, rs_exp_list);
2993                 struct ptlrpc_service *svc = rs->rs_service;
2994
2995                 spin_lock(&svc->srv_lock);
2996                 list_del_init(&rs->rs_exp_list);
2997                 ptlrpc_schedule_difficult_reply(rs);
2998                 spin_unlock(&svc->srv_lock);
2999         }
3000         spin_unlock(&exp->exp_lock);
3001
3002         class_export_put(exp);
3003         RETURN(rc);
3004 }
3005
3006 /* FIXME: Can we avoid using these two interfaces? */
3007 static int mdt_init_export(struct obd_export *exp)
3008 {
3009         struct mdt_export_data *med = &exp->exp_mdt_data;
3010         ENTRY;
3011
3012         INIT_LIST_HEAD(&med->med_open_head);
3013         spin_lock_init(&med->med_open_lock);
3014         exp->exp_connecting = 1;
3015         RETURN(0);
3016 }
3017
3018 static int mdt_destroy_export(struct obd_export *export)
3019 {
3020         struct mdt_export_data *med;
3021         struct obd_device *obd = export->exp_obd;
3022         struct mdt_device *mdt;
3023         struct mdt_thread_info *info;
3024         struct lu_context ctxt;
3025         int rc = 0;
3026         ENTRY;
3027
3028         med = &export->exp_mdt_data;
3029
3030         target_destroy_export(export);
3031
3032         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
3033                 RETURN(0);
3034
3035         mdt = mdt_dev(obd->obd_lu_dev);
3036         LASSERT(mdt != NULL);
3037
3038         rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3039         if (rc)
3040                 RETURN(rc);
3041
3042         lu_context_enter(&ctxt);
3043
3044         info = lu_context_key_get(&ctxt, &mdt_thread_key);
3045         LASSERT(info != NULL);
3046         memset(info, 0, sizeof *info);
3047         /* Close any open files (which may also cause orphan unlinking). */
3048         spin_lock(&med->med_open_lock);
3049         while (!list_empty(&med->med_open_head)) {
3050                 struct list_head *tmp = med->med_open_head.next;
3051                 struct mdt_file_data *mfd =
3052                         list_entry(tmp, struct mdt_file_data, mfd_list);
3053                 struct mdt_object *o = mfd->mfd_object;
3054
3055                 /* Remove mfd handle so it can't be found again.
3056                  * We are consuming the mfd_list reference here. */
3057                 class_handle_unhash(&mfd->mfd_handle);
3058                 list_del_init(&mfd->mfd_list);
3059                 spin_unlock(&med->med_open_lock);
3060                 mdt_mfd_close(&ctxt, mdt, mfd, &info->mti_attr);
3061                 /* TODO: if we close the unlinked file,
3062                  * we need to remove it's objects from OST */
3063                 mdt_object_put(&ctxt, o);
3064                 spin_lock(&med->med_open_lock);
3065         }
3066         spin_unlock(&med->med_open_lock);
3067         mdt_client_free(&ctxt, mdt, med);
3068
3069         lu_context_exit(&ctxt);
3070         lu_context_fini(&ctxt);
3071
3072         RETURN(rc);
3073 }
3074
3075 static int mdt_upcall(const struct lu_context *ctx, struct md_device *md,
3076                       enum md_upcall_event ev)
3077 {
3078         struct mdt_device *m = mdt_dev(&md->md_lu_dev);
3079         struct md_device  *next  = m->mdt_child;
3080         int rc = 0;
3081         ENTRY;
3082
3083         switch (ev) {
3084                 case MD_LOV_SYNC:
3085                         rc = next->md_ops->mdo_get_maxsize(ctx, next,
3086                                         &m->mdt_max_mdsize,
3087                                         &m->mdt_max_cookiesize);
3088                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
3089                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
3090                         break;
3091                 default:
3092                         CERROR("invalid event\n");
3093                         rc = -EINVAL;
3094                         break;
3095         }
3096         RETURN(rc);
3097 }
3098
3099 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3100                          void *karg, void *uarg)
3101 {
3102         struct lu_context ctxt;
3103         struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
3104         struct dt_device *dt = mdt->mdt_bottom;
3105         int rc;
3106
3107         ENTRY;
3108         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
3109         rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3110         if (rc)
3111                 RETURN(rc);
3112         lu_context_enter(&ctxt);
3113         if (cmd == OBD_IOC_SYNC || cmd == OBD_IOC_SET_READONLY) {
3114                 rc = dt->dd_ops->dt_sync(&ctxt, dt);
3115                 if (cmd == OBD_IOC_SET_READONLY)
3116                         dt->dd_ops->dt_ro(&ctxt, dt);
3117         } else
3118                 rc = -EOPNOTSUPP;
3119         lu_context_exit(&ctxt);
3120         lu_context_fini(&ctxt);
3121         RETURN(rc);
3122 }
3123
3124 int mdt_postrecov(struct obd_device *obd)
3125 {
3126         struct lu_context ctxt;
3127         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
3128         struct lu_device *ld = md2lu_dev(mdt->mdt_child);
3129         int rc;
3130         ENTRY;
3131
3132         rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3133         if (rc)
3134                 RETURN(rc);
3135         lu_context_enter(&ctxt);
3136         rc = ld->ld_ops->ldo_recovery_complete(&ctxt, ld);
3137         lu_context_exit(&ctxt);
3138         lu_context_fini(&ctxt);
3139         RETURN(rc);
3140 }
3141
3142 static struct obd_ops mdt_obd_device_ops = {
3143         .o_owner          = THIS_MODULE,
3144         .o_connect        = mdt_obd_connect,
3145         .o_reconnect      = mdt_obd_reconnect,
3146         .o_disconnect     = mdt_obd_disconnect,
3147         .o_init_export    = mdt_init_export,
3148         .o_destroy_export = mdt_destroy_export,
3149         .o_iocontrol      = mdt_iocontrol,
3150         .o_postrecov      = mdt_postrecov
3151
3152 };
3153
3154 static struct lu_device* mdt_device_fini(const struct lu_context *ctx,
3155                                          struct lu_device *d)
3156 {
3157         struct mdt_device *m = mdt_dev(d);
3158
3159         mdt_fini(ctx, m);
3160         RETURN(NULL);
3161 }
3162
3163 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
3164 {
3165         struct mdt_device *m = mdt_dev(d);
3166
3167         OBD_FREE_PTR(m);
3168 }
3169
3170 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
3171                                           struct lu_device_type *t,
3172                                           struct lustre_cfg *cfg)
3173 {
3174         struct lu_device  *l;
3175         struct mdt_device *m;
3176
3177         OBD_ALLOC_PTR(m);
3178         if (m != NULL) {
3179                 int rc;
3180
3181                 l = &m->mdt_md_dev.md_lu_dev;
3182                 rc = mdt_init0(ctx, m, t, cfg);
3183                 if (rc != 0) {
3184                         OBD_FREE_PTR(m);
3185                         l = ERR_PTR(rc);
3186                         return l;
3187                 }
3188                 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
3189         } else
3190                 l = ERR_PTR(-ENOMEM);
3191         return l;
3192 }
3193
3194 /*
3195  * context key constructor/destructor
3196  */
3197 static void *mdt_thread_init(const struct lu_context *ctx,
3198                              struct lu_context_key *key)
3199 {
3200         struct mdt_thread_info *info;
3201
3202         /*
3203          * check that no high order allocations are incurred.
3204          */
3205         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
3206         OBD_ALLOC_PTR(info);
3207         if (info == NULL)
3208                 info = ERR_PTR(-ENOMEM);
3209         return info;
3210 }
3211
3212 static void mdt_thread_fini(const struct lu_context *ctx,
3213                             struct lu_context_key *key, void *data)
3214 {
3215         struct mdt_thread_info *info = data;
3216         OBD_FREE_PTR(info);
3217 }
3218
3219 struct lu_context_key mdt_thread_key = {
3220         .lct_tags = LCT_MD_THREAD,
3221         .lct_init = mdt_thread_init,
3222         .lct_fini = mdt_thread_fini
3223 };
3224
3225 static void *mdt_txn_init(const struct lu_context *ctx,
3226                              struct lu_context_key *key)
3227 {
3228         struct mdt_txn_info *txi;
3229
3230         /*
3231          * check that no high order allocations are incurred.
3232          */
3233         CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
3234         OBD_ALLOC_PTR(txi);
3235         if (txi == NULL)
3236                 txi = ERR_PTR(-ENOMEM);
3237         return txi;
3238 }
3239
3240 static void mdt_txn_fini(const struct lu_context *ctx,
3241                             struct lu_context_key *key, void *data)
3242 {
3243         struct mdt_txn_info *txi = data;
3244         OBD_FREE_PTR(txi);
3245 }
3246
3247 struct lu_context_key mdt_txn_key = {
3248         .lct_tags = LCT_TX_HANDLE,
3249         .lct_init = mdt_txn_init,
3250         .lct_fini = mdt_txn_fini
3251 };
3252
3253
3254 static int mdt_type_init(struct lu_device_type *t)
3255 {
3256         int rc;
3257
3258         rc = lu_context_key_register(&mdt_thread_key);
3259         if (rc == 0)
3260                 rc = lu_context_key_register(&mdt_txn_key);
3261         return rc;
3262 }
3263
3264 static void mdt_type_fini(struct lu_device_type *t)
3265 {
3266         lu_context_key_degister(&mdt_thread_key);
3267         lu_context_key_degister(&mdt_txn_key);
3268 }
3269
3270 static struct lu_device_type_operations mdt_device_type_ops = {
3271         .ldto_init = mdt_type_init,
3272         .ldto_fini = mdt_type_fini,
3273
3274         .ldto_device_alloc = mdt_device_alloc,
3275         .ldto_device_free  = mdt_device_free,
3276         .ldto_device_fini  = mdt_device_fini
3277 };
3278
3279 static struct lu_device_type mdt_device_type = {
3280         .ldt_tags     = LU_DEVICE_MD,
3281         .ldt_name     = LUSTRE_MDT_NAME,
3282         .ldt_ops      = &mdt_device_type_ops,
3283         .ldt_ctx_tags = LCT_MD_THREAD
3284 };
3285
3286 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
3287         { 0 }
3288 };
3289
3290 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
3291         { 0 }
3292 };
3293
3294 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
3295
3296 static int __init mdt_mod_init(void)
3297 {
3298         int rc;
3299         struct lprocfs_static_vars lvars;
3300
3301         printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n");
3302         
3303         mdt_num_threads = MDT_NUM_THREADS;
3304         lprocfs_init_vars(mdt, &lvars);
3305         rc = class_register_type(&mdt_obd_device_ops, NULL,
3306                                  lvars.module_vars, LUSTRE_MDT_NAME,
3307                                  &mdt_device_type);
3308         return rc;
3309 }
3310
3311 static void __exit mdt_mod_exit(void)
3312 {
3313         class_unregister_type(LUSTRE_MDT_NAME);
3314 }
3315
3316
3317 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
3318 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
3319         .mh_name    = #opc,                                             \
3320         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
3321         .mh_opc     = prefix ## _  ## opc,                              \
3322         .mh_flags   = flags,                                            \
3323         .mh_act     = fn,                                               \
3324         .mh_fmt     = fmt                                               \
3325 }
3326
3327 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
3328         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
3329
3330 #define DEF_SEQ_HNDL(flags, name, fn, fmt)                      \
3331         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
3332
3333 #define DEF_FLD_HNDL(flags, name, fn, fmt)                      \
3334         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
3335 /*
3336  * Request with a format known in advance
3337  */
3338 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
3339         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
3340
3341 #define DEF_SEQ_HNDL_F(flags, name, fn)                                 \
3342         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
3343
3344 #define DEF_FLD_HNDL_F(flags, name, fn)                                 \
3345         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
3346 /*
3347  * Request with a format we do not yet know
3348  */
3349 #define DEF_MDT_HNDL_0(flags, name, fn)                                 \
3350         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
3351
3352 static struct mdt_handler mdt_mds_ops[] = {
3353 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
3354 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
3355 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
3356 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR,      mdt_getattr),
3357 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
3358 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO|MUTABOR,
3359                                           SETXATTR,     mdt_setxattr),
3360 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
3361 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
3362 DEF_MDT_HNDL_F(0                        |MUTABOR,
3363                                           REINT,        mdt_reint),
3364 DEF_MDT_HNDL_F(HABEO_CORPUS             , CLOSE,        mdt_close),
3365 DEF_MDT_HNDL_0(0,                         DONE_WRITING, mdt_done_writing),
3366 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
3367 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
3368 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_quotacheck_handle),
3369 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle)
3370 };
3371
3372 #define DEF_OBD_HNDL(flags, name, fn)                   \
3373         DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
3374
3375
3376 static struct mdt_handler mdt_obd_ops[] = {
3377         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
3378         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
3379         DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
3380 };
3381
3382 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
3383         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
3384 #define DEF_DLM_HNDL_F(flags, name, fn)                   \
3385         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
3386
3387 static struct mdt_handler mdt_dlm_ops[] = {
3388         DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
3389         DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
3390         DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
3391         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
3392 };
3393
3394 static struct mdt_handler mdt_llog_ops[] = {
3395 };
3396
3397 static struct mdt_opc_slice mdt_regular_handlers[] = {
3398         {
3399                 .mos_opc_start = MDS_GETATTR,
3400                 .mos_opc_end   = MDS_LAST_OPC,
3401                 .mos_hs        = mdt_mds_ops
3402         },
3403         {
3404                 .mos_opc_start = OBD_PING,
3405                 .mos_opc_end   = OBD_LAST_OPC,
3406                 .mos_hs        = mdt_obd_ops
3407         },
3408         {
3409                 .mos_opc_start = LDLM_ENQUEUE,
3410                 .mos_opc_end   = LDLM_LAST_OPC,
3411                 .mos_hs        = mdt_dlm_ops
3412         },
3413         {
3414                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
3415                 .mos_opc_end   = LLOG_LAST_OPC,
3416                 .mos_hs        = mdt_llog_ops
3417         },
3418         {
3419                 .mos_hs        = NULL
3420         }
3421 };
3422
3423 static struct mdt_handler mdt_readpage_ops[] = {
3424         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
3425 #ifdef HAVE_SPLIT_SUPPORT
3426         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
3427 #endif
3428
3429         /*
3430          * XXX: this is ugly and should be fixed one day, see mdc_close() for
3431          * detailed comments. --umka
3432          */
3433         DEF_MDT_HNDL_F(HABEO_CORPUS,              CLOSE,    mdt_close),
3434 };
3435
3436 static struct mdt_opc_slice mdt_readpage_handlers[] = {
3437         {
3438                 .mos_opc_start = MDS_GETATTR,
3439                 .mos_opc_end   = MDS_LAST_OPC,
3440                 .mos_hs        = mdt_readpage_ops
3441         },
3442         {
3443                 .mos_hs        = NULL
3444         }
3445 };
3446
3447 static struct mdt_handler mdt_seq_ops[] = {
3448         DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
3449 };
3450
3451 static struct mdt_opc_slice mdt_seq_handlers[] = {
3452         {
3453                 .mos_opc_start = SEQ_QUERY,
3454                 .mos_opc_end   = SEQ_LAST_OPC,
3455                 .mos_hs        = mdt_seq_ops
3456         },
3457         {
3458                 .mos_hs        = NULL
3459         }
3460 };
3461
3462 static struct mdt_handler mdt_fld_ops[] = {
3463         DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
3464 };
3465
3466 static struct mdt_opc_slice mdt_fld_handlers[] = {
3467         {
3468                 .mos_opc_start = FLD_QUERY,
3469                 .mos_opc_end   = FLD_LAST_OPC,
3470                 .mos_hs        = mdt_fld_ops
3471         },
3472         {
3473                 .mos_hs        = NULL
3474         }
3475 };
3476
3477 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3478 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
3479 MODULE_LICENSE("GPL");
3480
3481 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
3482                 "number of mdt service threads to start");
3483
3484 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);