Whamcloud - gitweb
- removed crow test from replay-single;
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mdt/mdt_handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *   Author: Huang Hua <huanghua@clusterfs.com>
14  *
15  *   This file is part of the Lustre file system, http://www.lustre.org
16  *   Lustre is a trademark of Cluster File Systems, Inc.
17  *
18  *   You may have signed or agreed to another license before downloading
19  *   this software.  If so, you are bound by the terms and conditions
20  *   of that agreement, and the following does not apply to you.  See the
21  *   LICENSE file included with this distribution for more information.
22  *
23  *   If you did not agree to a different license, then this copy of Lustre
24  *   is open source software; you can redistribute it and/or modify it
25  *   under the terms of version 2 of the GNU General Public License as
26  *   published by the Free Software Foundation.
27  *
28  *   In either case, Lustre is distributed in the hope that it will be
29  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31  *   license text for more details.
32  */
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37 #define DEBUG_SUBSYSTEM S_MDS
38
39 #include <linux/module.h>
40
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
43 /*
44  * struct OBD_{ALLOC,FREE}*()
45  * MDT_FAIL_CHECK
46  */
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
53 #include <obd.h>
54 /* lu2dt_dev() */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include <lustre_mdt.h>
58 #include "mdt_internal.h"
59 #include <linux/lustre_acl.h>
60 /*
61  * Initialized in mdt_mod_init().
62  */
63 unsigned long mdt_num_threads;
64
65 /* ptlrpc request handler for MDT. All handlers are
66  * grouped into several slices - struct mdt_opc_slice,
67  * and stored in an array - mdt_handlers[].
68  */
69 struct mdt_handler {
70         /* The name of this handler. */
71         const char *mh_name;
72         /* Fail id for this handler, checked at the beginning of this handler*/
73         int         mh_fail_id;
74         /* Operation code for this handler */
75         __u32       mh_opc;
76         /* flags are listed in enum mdt_handler_flags below. */
77         __u32       mh_flags;
78         /* The actual handler function to execute. */
79         int (*mh_act)(struct mdt_thread_info *info);
80         /* Request format for this request. */
81         const struct req_format *mh_fmt;
82 };
83
84 enum mdt_handler_flags {
85         /*
86          * struct mdt_body is passed in the incoming message, and object
87          * identified by this fid exists on disk.
88          *
89          * "habeo corpus" == "I have a body"
90          */
91         HABEO_CORPUS = (1 << 0),
92         /*
93          * struct ldlm_request is passed in the incoming message.
94          *
95          * "habeo clavis" == "I have a key"
96          */
97         HABEO_CLAVIS = (1 << 1),
98         /*
99          * this request has fixed reply format, so that reply message can be
100          * packed by generic code.
101          *
102          * "habeo refero" == "I have a reply"
103          */
104         HABEO_REFERO = (1 << 2),
105         /*
106          * this request will modify something, so check whether the filesystem
107          * is readonly or not, then return -EROFS to client asap if necessary.
108          *
109          * "mutabor" == "I shall modify"
110          */
111         MUTABOR      = (1 << 3)
112 };
113
114 struct mdt_opc_slice {
115         __u32               mos_opc_start;
116         int                 mos_opc_end;
117         struct mdt_handler *mos_hs;
118 };
119
120 static struct mdt_opc_slice mdt_regular_handlers[];
121 static struct mdt_opc_slice mdt_readpage_handlers[];
122 static struct mdt_opc_slice mdt_seq_handlers[];
123 static struct mdt_opc_slice mdt_fld_handlers[];
124
125 static struct mdt_device *mdt_dev(struct lu_device *d);
126 static int mdt_regular_handle(struct ptlrpc_request *req);
127 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
128
129 static struct lu_object_operations mdt_obj_ops;
130
131 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
132 {
133         if (!rep)
134                 return 0;
135         return (rep->lock_policy_res1 & flag);
136 }
137
138 void mdt_clear_disposition(struct mdt_thread_info *info,
139                            struct ldlm_reply *rep, int flag)
140 {
141         if (info)
142                 info->mti_opdata &= ~flag;
143         if (rep)
144                 rep->lock_policy_res1 &= ~flag;
145 }
146
147 void mdt_set_disposition(struct mdt_thread_info *info,
148                          struct ldlm_reply *rep, int flag)
149 {
150         if (info)
151                 info->mti_opdata |= flag;
152         if (rep)
153                 rep->lock_policy_res1 |= flag;
154 }
155
156 static int mdt_is_remote_object(struct mdt_object *o)
157 {
158        return (o->mot_header.loh_attr & LOHA_REMOTE); 
159 }        
160
161
162 static int mdt_getstatus(struct mdt_thread_info *info)
163 {
164         struct md_device *next  = info->mti_mdt->mdt_child;
165         int               rc;
166         struct mdt_body  *body;
167
168         ENTRY;
169
170         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
171                 rc = -ENOMEM;
172         else {
173                 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
174                 rc = next->md_ops->mdo_root_get(info->mti_ctxt,
175                                                 next, &body->fid1);
176                 if (rc == 0)
177                         body->valid |= OBD_MD_FLID;
178         }
179
180         RETURN(rc);
181 }
182
183 static int mdt_statfs(struct mdt_thread_info *info)
184 {
185         struct md_device  *next  = info->mti_mdt->mdt_child;
186         struct obd_statfs *osfs;
187         int                rc;
188
189         ENTRY;
190
191         /* This will trigger a watchdog timeout */
192         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
193                          (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
194
195
196         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
197                 rc = -ENOMEM;
198         } else {
199                 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
200                 /* XXX max_age optimisation is needed here. See mds_statfs */
201                 rc = next->md_ops->mdo_statfs(info->mti_ctxt,
202                                               next, &info->mti_u.ksfs);
203                 statfs_pack(osfs, &info->mti_u.ksfs);
204         }
205
206         RETURN(rc);
207 }
208
209 void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr,
210                         struct mdt_object *o)
211 {
212         /* Check if Size-on-MDS is enabled. */
213         if (S_ISREG(attr->la_mode) && mdt_sizeonmds_enabled(o)) {
214                 b->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
215                 b->size = attr->la_size;
216                 b->blocks = attr->la_blocks;
217         }
218 }
219
220 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
221                         const struct lu_fid *fid)
222 {
223         /*XXX should pack the reply body according to lu_valid*/
224         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID   |
225                     OBD_MD_FLGID   | OBD_MD_FLTYPE  |
226                     OBD_MD_FLMODE  | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
227                     OBD_MD_FLATIME | OBD_MD_FLMTIME ;
228
229         if (!S_ISREG(attr->la_mode))
230                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
231         
232         b->atime      = attr->la_atime;
233         b->mtime      = attr->la_mtime;
234         b->ctime      = attr->la_ctime;
235         b->mode       = attr->la_mode;
236         b->size       = attr->la_size;
237         b->blocks     = attr->la_blocks;
238         b->uid        = attr->la_uid;
239         b->gid        = attr->la_gid;
240         b->flags      = attr->la_flags;
241         b->nlink      = attr->la_nlink;
242         b->rdev       = attr->la_rdev;
243
244         if (fid) {
245                 b->fid1 = *fid;
246                 b->valid |= OBD_MD_FLID;
247                 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
248                                 PFID(fid), b->nlink, b->mode, b->size);
249         }
250 }
251
252 static inline int mdt_body_has_lov(const struct lu_attr *la,
253                                    const struct mdt_body *body)
254 {
255         return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
256                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
257 }
258
259 static int mdt_getattr_internal(struct mdt_thread_info *info,
260                                 struct mdt_object *o)
261 {
262         struct md_object        *next = mdt_object_child(o);
263         const struct mdt_body   *reqbody = info->mti_body;
264         struct ptlrpc_request   *req = mdt_info_req(info);
265         struct md_attr          *ma = &info->mti_attr;
266         struct lu_attr          *la = &ma->ma_attr;
267         struct req_capsule      *pill = &info->mti_pill;
268         const struct lu_context *ctxt = info->mti_ctxt;
269         struct mdt_body         *repbody;
270         void                    *buffer;
271         int                     length;
272         int                     rc;
273         ENTRY;
274
275         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
276                 RETURN(-ENOMEM);
277
278         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
279         repbody->eadatasize = 0;
280         repbody->aclsize = 0;
281
282         if (reqbody->valid & OBD_MD_MEA) {
283                 /* Assumption: MDT_MD size is enough for lmv size FIXME */
284                 ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
285                 ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD, 
286                                                              RCL_SERVER);
287                 ma->ma_need = MA_INODE | MA_LMV;
288         } else {
289                 ma->ma_need = MA_INODE | MA_LOV ;
290                 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
291                 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
292                                                              RCL_SERVER);
293         }
294         rc = mo_attr_get(ctxt, next, ma);
295         if (rc == -EREMOTE) {
296                 /* This object is located on remote node.*/
297                 repbody->fid1 = *mdt_object_fid(o);
298                 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
299                 RETURN(0);
300         } else if (rc) {
301                 CERROR("getattr error for "DFID": %d\n",
302                         PFID(mdt_object_fid(o)), rc);
303                 RETURN(rc);
304         }
305
306         if (ma->ma_valid & MA_INODE)
307                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
308         else
309                 RETURN(-EFAULT);
310
311         if (mdt_body_has_lov(la, reqbody)) {
312                 if (ma->ma_valid & MA_LOV) {
313                         LASSERT(ma->ma_lmm_size);
314                         mdt_dump_lmm(D_INFO, ma->ma_lmm);
315                         repbody->eadatasize = ma->ma_lmm_size;
316                         if (S_ISDIR(la->la_mode))
317                                 repbody->valid |= OBD_MD_FLDIREA;
318                         else
319                                 repbody->valid |= OBD_MD_FLEASIZE;
320                 }
321                 if (ma->ma_valid & MA_LMV) {
322                         LASSERT(S_ISDIR(la->la_mode));
323                         repbody->eadatasize = ma->ma_lmv_size;
324                         repbody->valid |= OBD_MD_FLDIREA;
325                         repbody->valid |= OBD_MD_MEA;
326                 }
327         } else if (S_ISLNK(la->la_mode) &&
328                           reqbody->valid & OBD_MD_LINKNAME) {
329                 rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
330                 if (rc <= 0) {
331                         CERROR("readlink failed: %d\n", rc);
332                         rc = -EFAULT;
333                 } else {
334                         repbody->valid |= OBD_MD_LINKNAME;
335                         repbody->eadatasize = rc + 1;
336                         ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
337                         CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
338                                         (char*)ma->ma_lmm, rc);
339                         rc = 0;
340                 }
341         }
342
343         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
344                 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
345                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
346                 repbody->valid |= OBD_MD_FLMODEASIZE;
347                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
348                                 "MAX_COOKIE to : %d:%d\n",
349                                 repbody->max_mdsize,
350                                 repbody->max_cookiesize);
351         }
352
353 #ifdef CONFIG_FS_POSIX_ACL
354         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
355             (reqbody->valid & OBD_MD_FLACL)) {
356                 buffer = req_capsule_server_get(pill, &RMF_ACL);
357                 length = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
358                 if (length > 0) {
359                         rc = mo_xattr_get(ctxt, next, buffer,
360                                           length, XATTR_NAME_ACL_ACCESS);
361                         if (rc < 0) {
362                                 if (rc == -ENODATA || rc == -EOPNOTSUPP)
363                                         rc = 0;
364                                 else
365                                         CERROR("got acl size: %d\n", rc);
366                         } else {
367                                 repbody->aclsize = rc;
368                                 repbody->valid |= OBD_MD_FLACL;
369                         }
370                 }
371         }
372 #endif
373
374         RETURN(rc);
375 }
376
377 static int mdt_getattr(struct mdt_thread_info *info)
378 {
379         int rc;
380         struct mdt_object *obj;
381
382         obj = info->mti_object;
383         LASSERT(obj != NULL);
384         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
385         ENTRY;
386
387         rc = mdt_getattr_internal(info, obj);
388         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
389         RETURN(rc);
390 }
391
392 static int mdt_is_subdir(struct mdt_thread_info *info)
393 {
394         struct mdt_object   *obj = info->mti_object;
395         struct req_capsule  *pill = &info->mti_pill;
396         struct mdt_body     *repbody;
397         int                  rc;
398
399         obj = info->mti_object;
400         LASSERT(obj != NULL);
401         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
402         ENTRY;
403
404         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
405
406         /*
407          * We save last checked parent fid to @repbody->fid1 for remote
408          * directory case.
409          */
410         rc = mdo_is_subdir(info->mti_ctxt, mdt_object_child(obj),
411                            &info->mti_tmp_fid2, &repbody->fid1);
412         if (rc < 0)
413                 RETURN(rc);
414         
415         /* 
416          * Save error code to ->mode. Later it it is used for detecting the case
417          * of remote subdir.
418          */
419         repbody->mode = rc;
420         repbody->valid = OBD_MD_FLMODE;
421         
422         if (rc == EREMOTE)
423                 repbody->valid |= OBD_MD_FLID;
424
425         
426         RETURN(0);
427 }
428
429 /*
430  * UPDATE lock should be taken against parent, and be release before exit;
431  * child_bits lock should be taken against child, and be returned back:
432  *            (1)normal request should release the child lock;
433  *            (2)intent request will grant the lock to client.
434  */
435 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
436                                  struct mdt_lock_handle *lhc,
437                                  __u64 child_bits,
438                                  struct ldlm_reply *ldlm_rep)
439 {
440         struct ptlrpc_request *req = mdt_info_req(info);
441         struct mdt_object     *parent = info->mti_object;
442         struct mdt_object     *child;
443         struct md_object      *next = mdt_object_child(info->mti_object);
444         struct lu_fid         *child_fid = &info->mti_tmp_fid1;
445         int                    is_resent, rc;
446         const char            *name;
447         struct mdt_lock_handle *lhp;
448         struct ldlm_lock      *lock;
449         ENTRY;
450
451         is_resent = lustre_handle_is_used(&lhc->mlh_lh);
452         if (is_resent)
453                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
454         
455         LASSERT(info->mti_object != NULL);
456         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
457         if (name == NULL)
458                 RETURN(-EFAULT);
459
460         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
461                         PFID(mdt_object_fid(parent)), name, ldlm_rep);
462
463         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
464         if (strlen(name) == 0) {
465                 /* only getattr on the child. parent is on another node. */
466                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
467                 child = parent;
468                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
469                        ", ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
470
471                 if (is_resent) {
472                         /* Do not take lock for resent case. */
473                         lock = ldlm_handle2lock(&lhc->mlh_lh);
474                         if (!lock) {
475                                 CERROR("Invalid lock handle "LPX64"\n",
476                                        lhc->mlh_lh.cookie);
477                                 LBUG();
478                         }
479                         LASSERT(fid_res_name_eq(mdt_object_fid(child),
480                                                 &lock->l_resource->lr_name));
481                         LDLM_LOCK_PUT(lock);
482                         rc = 0;
483                 } else {
484                         mdt_lock_handle_init(lhc);
485                         lhc->mlh_mode = LCK_CR;
486                         
487                         /*
488                          * Object's name is on another MDS, no lookup lock is
489                          * needed here but update is.
490                          */
491                         child_bits &= ~MDS_INODELOCK_LOOKUP;
492                         child_bits |= MDS_INODELOCK_UPDATE;
493                         rc = mdt_object_lock(info, child, lhc, child_bits);
494                 }
495                 if (rc == 0) {
496                         /* Finally, we can get attr for child. */
497                         rc = mdt_getattr_internal(info, child);
498                         if (rc != 0)
499                                 mdt_object_unlock(info, child, lhc, 1);
500                 }
501                 GOTO(out, rc);
502         }
503
504         /*step 1: lock parent */
505         lhp = &info->mti_lh[MDT_LH_PARENT];
506         lhp->mlh_mode = LCK_CR;
507         rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
508         if (rc != 0)
509                 RETURN(rc);
510
511         /*step 2: lookup child's fid by name */
512         rc = mdo_lookup(info->mti_ctxt, next, name, child_fid);
513         if (rc != 0) {
514                 if (rc == -ENOENT)
515                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
516                 GOTO(out_parent, rc);
517         } else
518                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
519         /*
520          *step 3: find the child object by fid & lock it.
521          *        regardless if it is local or remote.
522          */
523         child = mdt_object_find(info->mti_ctxt, info->mti_mdt, child_fid);
524         if (IS_ERR(child))
525                 GOTO(out_parent, rc = PTR_ERR(child));
526         if (is_resent) {
527                 /* Do not take lock for resent case. */
528                 lock = ldlm_handle2lock(&lhc->mlh_lh);
529                 if (!lock) {
530                         CERROR("Invalid lock handle "LPX64"\n",
531                                lhc->mlh_lh.cookie);
532                         LBUG();
533                 }
534                 LASSERT(fid_res_name_eq(child_fid,
535                                         &lock->l_resource->lr_name));
536                 LDLM_LOCK_PUT(lock);
537         } else {
538                 mdt_lock_handle_init(lhc);
539                 lhc->mlh_mode = LCK_CR;
540                 rc = mdt_object_cr_lock(info, child, lhc, child_bits);
541                 if (rc != 0)
542                         GOTO(out_child, rc);
543         }
544
545         /* finally, we can get attr for child. */
546         rc = mdt_getattr_internal(info, child);
547         if (rc != 0) {
548                 mdt_object_unlock(info, child, lhc, 1);
549         } else {
550                 struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh);
551                 if (lock) {
552                         struct ldlm_res_id *res_id;
553                         struct mdt_body *repbody;
554                         struct lu_attr *ma;
555                         
556                         /* Debugging code. */
557                         res_id = &lock->l_resource->lr_name;
558                         LDLM_DEBUG(lock, "we will return this lock client\n");
559                         LASSERTF(fid_res_name_eq(mdt_object_fid(child),
560                                                  &lock->l_resource->lr_name),
561                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
562                                 (unsigned long)res_id->name[0],
563                                 (unsigned long)res_id->name[1],
564                                 (unsigned long)res_id->name[2],
565                                 PFID(mdt_object_fid(child)));
566                         
567                         /* Pack Size-on-MDS inode attributes to the body if
568                          * update lock is given. */
569                         repbody = req_capsule_server_get(&info->mti_pill, 
570                                                          &RMF_MDT_BODY);
571                         ma = &info->mti_attr.ma_attr;
572                         if (lock->l_policy_data.l_inodebits.bits &
573                             MDS_INODELOCK_UPDATE)
574                                 mdt_pack_size2body(repbody, ma, child);
575                         LDLM_LOCK_PUT(lock);
576                 }
577                 
578
579         }
580         EXIT;
581 out_child:
582         mdt_object_put(info->mti_ctxt, child);
583 out_parent:
584         mdt_object_unlock(info, parent, lhp, 1);
585 out:
586         return rc;
587 }
588
589 /* normal handler: should release the child lock */
590 static int mdt_getattr_name(struct mdt_thread_info *info)
591 {
592         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
593         int rc;
594
595         ENTRY;
596
597         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
598         if (lustre_handle_is_used(&lhc->mlh_lh)) {
599                 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
600                 lhc->mlh_lh.cookie = 0;
601         }
602         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
603         RETURN(rc);
604 }
605
606 static struct lu_device_operations mdt_lu_ops;
607
608 static int lu_device_is_mdt(struct lu_device *d)
609 {
610         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
611 }
612
613 static inline struct mdt_device *mdt_dev(struct lu_device *d)
614 {
615         LASSERT(lu_device_is_mdt(d));
616         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
617 }
618
619 static int mdt_connect(struct mdt_thread_info *info)
620 {
621         int rc;
622         struct ptlrpc_request *req;
623
624         req = mdt_info_req(info);
625         rc = target_handle_connect(req, mdt_regular_handle);
626         if (rc == 0) {
627                 LASSERT(req->rq_export != NULL);
628                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
629         }
630         return rc;
631 }
632
633 static int mdt_disconnect(struct mdt_thread_info *info)
634 {
635         return target_handle_disconnect(mdt_info_req(info));
636 }
637
638 static int mdt_sendpage(struct mdt_thread_info *info,
639                         struct lu_rdpg *rdpg)
640 {
641         struct ptlrpc_request   *req = mdt_info_req(info);
642         struct ptlrpc_bulk_desc *desc;
643         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
644         int                      tmpcount;
645         int                      tmpsize;
646         int                      i;
647         int                      rc;
648         ENTRY;
649
650         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
651                                     MDS_BULK_PORTAL);
652         if (desc == NULL)
653                 GOTO(out, rc = -ENOMEM);
654
655         for (i = 0, tmpcount = rdpg->rp_count;
656                 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
657                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
658                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
659         }
660
661         LASSERT(desc->bd_nob == rdpg->rp_count);
662         rc = ptlrpc_start_bulk_transfer(desc);
663         if (rc)
664                 GOTO(free_desc, rc);
665
666         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
667                 GOTO(abort_bulk, rc);
668
669         *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
670         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
671         LASSERT (rc == 0 || rc == -ETIMEDOUT);
672
673         if (rc == 0) {
674                 if (desc->bd_success &&
675                     desc->bd_nob_transferred == rdpg->rp_count)
676                         GOTO(free_desc, rc);
677
678                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
679         }
680
681         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
682                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
683                   desc->bd_nob_transferred, rdpg->rp_count,
684                   req->rq_export->exp_client_uuid.uuid,
685                   req->rq_export->exp_connection->c_remote_uuid.uuid);
686
687         class_fail_export(req->rq_export);
688
689         EXIT;
690 abort_bulk:
691         ptlrpc_abort_bulk(desc);
692 free_desc:
693         ptlrpc_free_bulk(desc);
694 out:
695         return rc;
696 }
697
698 #ifdef HAVE_SPLIT_SUPPORT
699 /*
700  * Retrieve dir entry from the page and insert it to the
701  * slave object, actually, this should be in osd layer,
702  * but since it will not in the final product, so just do
703  * it here and do not define more moo api anymore for
704  * this.
705  */
706 static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
707 {
708         struct mdt_object *object = info->mti_object;
709         struct lu_dirpage *dp;
710         struct lu_dirent *ent;
711         int rc = 0;
712
713
714         /* Disable trans for this name insert, since it will 
715          * include many trans for this */
716         info->mti_no_need_trans = 1;
717         kmap(page);
718         dp = page_address(page);
719         for (ent = lu_dirent_start(dp); ent != NULL;
720                           ent = lu_dirent_next(ent)) {
721                 struct lu_fid *lf = &ent->lde_fid;
722                 
723                 /* FIXME: multi-trans for this name insert */
724                 if (strncmp(ent->lde_name, ".", ent->lde_namelen) && 
725                     strncmp(ent->lde_name, "..", ent->lde_namelen)) {
726                         char *name;
727                         /* FIXME: Here we allocate name for each name,
728                          * maybe stupid, but can not find better way.
729                          * will find better way */
730                         OBD_ALLOC(name, ent->lde_namelen + 1);
731                         memcpy(name, ent->lde_name, ent->lde_namelen);
732                         rc = mdo_name_insert(info->mti_ctxt,
733                                              md_object_next(&object->mot_obj),
734                                              name, lf, 0);
735                         OBD_FREE(name, ent->lde_namelen + 1);
736                         if (rc)
737                                 GOTO(out, rc);
738                 }
739         }
740 out:
741         kunmap(page);
742         RETURN(rc);
743 }
744
745 static int mdt_bulk_timeout(void *data)
746 {
747         ENTRY;
748         
749         CERROR("mdt bulk transfer timeout \n");
750         
751         RETURN(1);
752 }
753
754 static int mdt_writepage(struct mdt_thread_info *info)
755 {
756         struct ptlrpc_request   *req = mdt_info_req(info);
757         struct mdt_body         *reqbody;
758         struct l_wait_info      *lwi;
759         struct ptlrpc_bulk_desc *desc;
760         struct page             *page;
761         int                rc;
762         ENTRY;
763
764         
765         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
766         if (reqbody == NULL)
767                 RETURN(-EFAULT);
768
769         desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
770         if (!desc)
771                 RETURN(-ENOMEM);
772
773         /* allocate the page for the desc */
774         page = alloc_pages(GFP_KERNEL, 0);
775         if (!page)
776                 GOTO(desc_cleanup, rc = -ENOMEM);
777
778         CDEBUG(D_INFO, "Received page offset %d size %d \n", 
779                         (int)reqbody->size, (int)reqbody->nlink);
780
781         ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size, 
782                               (int)reqbody->nlink);
783
784         /* FIXME: following parts are copied from ost_brw_write */
785
786         /* Check if client was evicted while we were doing i/o before touching
787            network */
788         OBD_ALLOC_PTR(lwi);
789         if (!lwi)
790                 GOTO(cleanup_page, rc = -ENOMEM);
791
792         if (desc->bd_export->exp_failed)
793                 rc = -ENOTCONN;
794         else
795                 rc = ptlrpc_start_bulk_transfer (desc);
796         if (rc == 0) {
797                 *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
798                                             mdt_bulk_timeout, desc);
799                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
800                                   desc->bd_export->exp_failed, lwi);
801                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
802                 if (rc == -ETIMEDOUT) {
803                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
804                         ptlrpc_abort_bulk(desc);
805                 } else if (desc->bd_export->exp_failed) {
806                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
807                         rc = -ENOTCONN;
808                         ptlrpc_abort_bulk(desc);
809                 } else if (!desc->bd_success ||
810                            desc->bd_nob_transferred != desc->bd_nob) {
811                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
812                                   desc->bd_success ?
813                                   "truncated" : "network error on",
814                                   desc->bd_nob_transferred, desc->bd_nob);
815                         /* XXX should this be a different errno? */
816                         rc = -ETIMEDOUT;
817                 }
818         } else {
819                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
820         }
821         if (rc)
822                 GOTO(cleanup_lwi, rc);
823         rc = mdt_write_dir_page(info, page);
824
825 cleanup_lwi:
826         OBD_FREE_PTR(lwi);
827 cleanup_page:
828         __free_pages(page, 0);
829 desc_cleanup:
830         ptlrpc_free_bulk(desc);
831         RETURN(rc);
832 }
833 #endif
834
835 static int mdt_readpage(struct mdt_thread_info *info)
836 {
837         struct mdt_object *object = info->mti_object;
838         struct lu_rdpg    *rdpg = &info->mti_u.rdpg.mti_rdpg;
839         struct mdt_body   *reqbody;
840         struct mdt_body   *repbody;
841         int                rc, rc1 = 0;
842         int                i;
843         ENTRY;
844
845         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
846                 RETURN(-ENOMEM);
847
848         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
849         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
850         if (reqbody == NULL || repbody == NULL)
851                 RETURN(-EFAULT);
852
853         /*
854          * prepare @rdpg before calling lower layers and transfer itself. Here
855          * reqbody->size contains offset of where to start to read and
856          * reqbody->nlink contains number bytes to read.
857          */
858         rdpg->rp_hash = reqbody->size;
859         if ((__u64)rdpg->rp_hash != reqbody->size) {
860                 CERROR("Invalid hash: %#llx != %#llx\n",
861                        (__u64)rdpg->rp_hash, reqbody->size);
862                 RETURN(-EFAULT);
863         }
864         rdpg->rp_count  = reqbody->nlink;
865         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
866         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
867         if (rdpg->rp_pages == NULL)
868                 RETURN(-ENOMEM);
869
870         for (i = 0; i < rdpg->rp_npages; ++i) {
871                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
872                 if (rdpg->rp_pages[i] == NULL)
873                         GOTO(free_rdpg, rc = -ENOMEM);
874         }
875
876         /* call lower layers to fill allocated pages with directory data */
877         rc = mo_readpage(info->mti_ctxt, mdt_object_child(object), rdpg);
878         if (rc) {
879                 if (rc == -ERANGE)
880                         rc1 = rc;
881                 else 
882                         GOTO(free_rdpg, rc);
883         }
884
885         /* send pages to client */
886         rc = mdt_sendpage(info, rdpg);
887
888         EXIT;
889 free_rdpg:
890         
891         for (i = 0; i < rdpg->rp_npages; i++)
892                 if (rdpg->rp_pages[i] != NULL)
893                         __free_pages(rdpg->rp_pages[i], 0);
894         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
895
896         MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
897
898         return rc ? rc : rc1;
899 }
900
901 static int mdt_reint_internal(struct mdt_thread_info *info,
902                               struct mdt_lock_handle *lhc,
903                               __u32 op)
904 {
905         struct req_capsule      *pill = &info->mti_pill;
906         struct mdt_device       *mdt = info->mti_mdt;
907         struct ptlrpc_request   *req = mdt_info_req(info);
908         int                      rc;
909         ENTRY;
910
911         rc = mdt_reint_unpack(info, op);
912         if (rc != 0) {
913                 CERROR("Can't unpack reint, rc %d\n", rc);
914                 RETURN(rc);
915         }
916
917         /* pack reply */
918         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
919                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
920                                      mdt->mdt_max_mdsize);
921         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
922                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
923                                      mdt->mdt_max_cookiesize);
924         rc = req_capsule_pack(pill);
925         if (rc != 0) {
926                 CERROR("Can't pack response, rc %d\n", rc);
927                 RETURN(rc);
928         }
929
930         /*
931          * Check this after packing response, because after we fail here without
932          * allocating response, caller anyway may want to get ldlm_reply from it
933          * and will get oops.
934          */
935         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
936                 RETURN(-EFAULT);
937         
938         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
939                 struct mdt_client_data *mcd;
940
941                 mcd = req->rq_export->exp_mdt_data.med_mcd;
942                 if (mcd->mcd_last_xid == req->rq_xid) {
943                         mdt_reconstruct(info, lhc);
944                         RETURN(lustre_msg_get_status(req->rq_repmsg));
945                 }
946                 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
947                           mcd->mcd_last_xid);
948         }
949         rc = mdt_reint_rec(info, lhc);
950
951         RETURN(rc);
952 }
953
954 static long mdt_reint_opcode(struct mdt_thread_info *info,
955                              const struct req_format **fmt)
956 {
957         __u32 *ptr;
958         long opc;
959
960         opc = -EFAULT;
961         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
962         if (ptr != NULL) {
963                 opc = *ptr;
964                 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
965                 if (opc < REINT_MAX && fmt[opc] != NULL)
966                         req_capsule_extend(&info->mti_pill, fmt[opc]);
967                 else
968                         CERROR("Unsupported opc: %ld\n", opc);
969         }
970         return opc;
971 }
972
973 static int mdt_reint(struct mdt_thread_info *info)
974 {
975         long opc;
976         int  rc;
977
978         static const struct req_format *reint_fmts[REINT_MAX] = {
979                 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
980                 [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
981                 [REINT_LINK]    = &RQF_MDS_REINT_LINK,
982                 [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
983                 [REINT_RENAME]  = &RQF_MDS_REINT_RENAME,
984                 [REINT_OPEN]    = &RQF_MDS_REINT_OPEN
985         };
986
987         ENTRY;
988
989         opc = mdt_reint_opcode(info, reint_fmts);
990         if (opc >= 0) {
991                 /* 
992                  * No lock possible here from client to pass it to reint code
993                  * path.
994                  */
995                 rc = mdt_reint_internal(info, NULL, opc);
996         } else
997                 rc = opc;
998
999         info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
1000         RETURN(rc);
1001 }
1002
1003 /* TODO these two methods not available now. */
1004
1005 /* this should sync the whole device */
1006 static int mdt_device_sync(struct mdt_thread_info *info)
1007 {
1008         return 0;
1009 }
1010
1011 /* this should sync this object */
1012 static int mdt_object_sync(struct mdt_thread_info *info)
1013 {
1014         return 0;
1015 }
1016
1017 static int mdt_sync(struct mdt_thread_info *info)
1018 {
1019         struct req_capsule *pill = &info->mti_pill;
1020         struct mdt_body *body;
1021         int rc;
1022         ENTRY;
1023
1024         /* The fid may be zero, so we req_capsule_set manually */
1025         req_capsule_set(pill, &RQF_MDS_SYNC);
1026
1027         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1028         if (body == NULL)
1029                 RETURN(-EINVAL);
1030
1031         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
1032                 RETURN(-ENOMEM);
1033
1034         if (fid_seq(&body->fid1) == 0) {
1035                 /* sync the whole device */
1036                 rc = req_capsule_pack(pill);
1037                 if (rc == 0)
1038                         rc = mdt_device_sync(info);
1039         } else {
1040                 /* sync an object */
1041                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
1042                 if (rc == 0) {
1043                         rc = mdt_object_sync(info);
1044                         if (rc == 0) {
1045                                 struct md_object *next;
1046                                 const struct lu_fid *fid;
1047                                 struct lu_attr *la = &info->mti_attr.ma_attr;
1048                                 
1049                                 next = mdt_object_child(info->mti_object);
1050                                 info->mti_attr.ma_need = MA_INODE;
1051                                 rc = mo_attr_get(info->mti_ctxt, next,
1052                                                  &info->mti_attr);
1053                                 if (rc == 0) {
1054                                         body = req_capsule_server_get(pill,
1055                                                                 &RMF_MDT_BODY);
1056                                         fid = mdt_object_fid(info->mti_object);
1057                                         mdt_pack_attr2body(body, la, fid);
1058                                 }
1059                         }
1060                 }
1061         }
1062         RETURN(rc);
1063 }
1064
1065 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
1066 {
1067         return -EOPNOTSUPP;
1068 }
1069
1070 static int mdt_quotactl_handle(struct mdt_thread_info *info)
1071 {
1072         return -EOPNOTSUPP;
1073 }
1074
1075 /*
1076  * OBD PING and other handlers.
1077  */
1078 static int mdt_obd_ping(struct mdt_thread_info *info)
1079 {
1080         int rc;
1081         ENTRY;
1082         rc = target_handle_ping(mdt_info_req(info));
1083         RETURN(rc);
1084 }
1085
1086 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
1087 {
1088         return -EOPNOTSUPP;
1089 }
1090
1091 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
1092 {
1093         return -EOPNOTSUPP;
1094 }
1095
1096
1097 /*
1098  * DLM handlers.
1099  */
1100
1101 static struct ldlm_callback_suite cbs = {
1102         .lcs_completion = ldlm_server_completion_ast,
1103         .lcs_blocking   = ldlm_server_blocking_ast,
1104         .lcs_glimpse    = NULL
1105 };
1106
1107 static int mdt_enqueue(struct mdt_thread_info *info)
1108 {
1109         int rc;
1110         struct ptlrpc_request *req;
1111
1112         /*
1113          * info->mti_dlm_req already contains swapped and (if necessary)
1114          * converted dlm request.
1115          */
1116         LASSERT(info->mti_dlm_req != NULL);
1117
1118         req = mdt_info_req(info);
1119         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
1120         rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
1121                                       req, info->mti_dlm_req, &cbs);
1122         return rc ? : req->rq_status;
1123 }
1124
1125 static int mdt_convert(struct mdt_thread_info *info)
1126 {
1127         int rc;
1128         struct ptlrpc_request *req;
1129
1130         LASSERT(info->mti_dlm_req);
1131         req = mdt_info_req(info);
1132         rc = ldlm_handle_convert0(req, info->mti_dlm_req);
1133         return rc ? : req->rq_status;
1134 }
1135
1136 static int mdt_bl_callback(struct mdt_thread_info *info)
1137 {
1138         CERROR("bl callbacks should not happen on MDS\n");
1139         LBUG();
1140         return -EOPNOTSUPP;
1141 }
1142
1143 static int mdt_cp_callback(struct mdt_thread_info *info)
1144 {
1145         CERROR("cp callbacks should not happen on MDS\n");
1146         LBUG();
1147         return -EOPNOTSUPP;
1148 }
1149
1150 /*
1151  * sec context handlers
1152  */
1153 static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
1154 {
1155         return 0;
1156 }
1157
1158 static struct mdt_object *mdt_obj(struct lu_object *o)
1159 {
1160         LASSERT(lu_device_is_mdt(o->lo_dev));
1161         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
1162 }
1163
1164 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
1165                                    struct mdt_device *d,
1166                                    const struct lu_fid *f)
1167 {
1168         struct lu_object *o;
1169         struct mdt_object *m;
1170         ENTRY;
1171
1172         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
1173         if (IS_ERR(o))
1174                 m = (struct mdt_object *)o;
1175         else
1176                 m = mdt_obj(o);
1177         RETURN(m);
1178 }
1179
1180 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
1181                     struct mdt_lock_handle *lh, __u64 ibits)
1182 {
1183         ldlm_policy_data_t *policy = &info->mti_policy;
1184         struct ldlm_res_id *res_id = &info->mti_res_id;
1185         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1186         int rc;
1187         ENTRY;
1188
1189         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1190         LASSERT(lh->mlh_mode != LCK_MINMODE);
1191         if (lu_object_exists(&o->mot_obj.mo_lu) < 0) {
1192                 LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
1193                 LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1194         }
1195         policy->l_inodebits.bits = ibits;
1196
1197         rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode,
1198                       policy, res_id);
1199         RETURN(rc);
1200 }
1201
1202 /* lock with cross-ref fixes */
1203 int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o,
1204                        struct mdt_lock_handle *lh, __u64 ibits)
1205 {
1206         if (lu_object_exists(&o->mot_obj.mo_lu) < 0) {
1207                 /* cross-ref object fix */
1208                 ibits &= ~MDS_INODELOCK_UPDATE;
1209                 ibits |= MDS_INODELOCK_LOOKUP;
1210         }
1211         return mdt_object_lock(info, o, lh, ibits);
1212 }
1213
1214 /*
1215  * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
1216  * to save this lock in req.  when transaction committed, req will be released,
1217  * and lock will, too.
1218  */
1219 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
1220                        struct mdt_lock_handle *lh, int decref)
1221 {
1222         struct ptlrpc_request *req    = mdt_info_req(info);
1223         struct lustre_handle  *handle = &lh->mlh_lh;
1224         ldlm_mode_t            mode   = lh->mlh_mode;
1225         ENTRY;
1226
1227         if (lustre_handle_is_used(handle)) {
1228                 if (decref)
1229                         fid_unlock(mdt_object_fid(o), handle, mode);
1230                 else
1231                         ptlrpc_save_lock(req, handle, mode);
1232                 handle->cookie = 0;
1233         }
1234         EXIT;
1235 }
1236
1237 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
1238                                         const struct lu_fid *f,
1239                                         struct mdt_lock_handle *lh,
1240                                         __u64 ibits)
1241 {
1242         struct mdt_object *o;
1243
1244         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
1245         if (!IS_ERR(o)) {
1246                 int rc;
1247
1248                 rc = mdt_object_lock(info, o, lh, ibits);
1249                 if (rc != 0) {
1250                         mdt_object_put(info->mti_ctxt, o);
1251                         o = ERR_PTR(rc);
1252                 }
1253         }
1254         return o;
1255 }
1256
1257 void mdt_object_unlock_put(struct mdt_thread_info * info,
1258                            struct mdt_object * o,
1259                            struct mdt_lock_handle *lh,
1260                            int decref)
1261 {
1262         mdt_object_unlock(info, o, lh, decref);
1263         mdt_object_put(info->mti_ctxt, o);
1264 }
1265
1266 static struct mdt_handler *mdt_handler_find(__u32 opc,
1267                                             struct mdt_opc_slice *supported)
1268 {
1269         struct mdt_opc_slice *s;
1270         struct mdt_handler   *h;
1271
1272         h = NULL;
1273         for (s = supported; s->mos_hs != NULL; s++) {
1274                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1275                         h = s->mos_hs + (opc - s->mos_opc_start);
1276                         if (h->mh_opc != 0)
1277                                 LASSERT(h->mh_opc == opc);
1278                         else
1279                                 h = NULL; /* unsupported opc */
1280                         break;
1281                 }
1282         }
1283         return h;
1284 }
1285
1286 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
1287 {
1288         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
1289 }
1290
1291 static inline __u64 req_exp_last_close_xid(struct ptlrpc_request *req)
1292 {
1293         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_close_xid);
1294 }
1295
1296 static int mdt_lock_resname_compat(struct mdt_device *m,
1297                                    struct ldlm_request *req)
1298 {
1299         /* XXX something... later. */
1300         return 0;
1301 }
1302
1303 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1304 {
1305         /* XXX something... later. */
1306         return 0;
1307 }
1308
1309 /*
1310  * Generic code handling requests that have struct mdt_body passed in:
1311  *
1312  *  - extract mdt_body from request and save it in @info, if present;
1313  *
1314  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
1315  *  @info;
1316  *
1317  *  - if HABEO_CORPUS flag is set for this request type check whether object
1318  *  actually exists on storage (lu_object_exists()).
1319  *
1320  */
1321 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1322 {
1323         const struct mdt_body   *body;
1324         struct mdt_object       *obj;
1325         const struct lu_context *ctx;
1326         struct req_capsule      *pill;
1327         int                     rc;
1328
1329         ctx = info->mti_ctxt;
1330         pill = &info->mti_pill;
1331
1332         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1333         if (body != NULL) {
1334                 if (fid_is_sane(&body->fid1)) {
1335                         obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
1336                         if (!IS_ERR(obj)) {
1337                                 if ((flags & HABEO_CORPUS) &&
1338                                     !lu_object_exists(&obj->mot_obj.mo_lu)) {
1339                                         mdt_object_put(ctx, obj);
1340                                         rc = -ENOENT;
1341                                 } else {
1342                                         info->mti_object = obj;
1343                                         rc = 0;
1344                                 }
1345                         } else
1346                                 rc = PTR_ERR(obj);
1347                 } else {
1348                         CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1349                         rc = -EINVAL;
1350                 }
1351         } else
1352                 rc = -EFAULT;
1353         return rc;
1354 }
1355
1356 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1357 {
1358         struct req_capsule *pill;
1359         int rc;
1360
1361         ENTRY;
1362         pill = &info->mti_pill;
1363
1364         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1365                 rc = mdt_body_unpack(info, flags);
1366         else
1367                 rc = 0;
1368
1369         if (rc == 0 && (flags & HABEO_REFERO)) {
1370                 struct mdt_device       *mdt = info->mti_mdt;
1371                 /*pack reply*/
1372                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1373                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1374                                              mdt->mdt_max_mdsize);
1375                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1376                         req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1377                                              mdt->mdt_max_cookiesize);
1378
1379                 rc = req_capsule_pack(pill);
1380         }
1381         RETURN(rc);
1382 }
1383
1384 #if 0
1385 struct lu_context_key mdt_txn_key;
1386 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
1387 {
1388         struct mdt_device     *mdt = info->mti_mdt;
1389         struct ptlrpc_request *req = mdt_info_req(info);
1390         struct obd_export     *exp = req->rq_export;
1391
1392         /* sometimes the reply message has not been successfully packed */
1393         if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1394                 return;
1395
1396         if (info->mti_trans_flags & MDT_NONEED_TRANSNO)
1397                 return;
1398
1399         /*XXX: assert on this when all code will be finished */
1400         if (rc != 0 && info->mti_transno != 0) {
1401                 info->mti_transno = 0;
1402                 CERROR("Transno is not 0 while rc is %i!\n", rc);
1403         }
1404
1405         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1406                info->mti_transno, exp->exp_obd->obd_last_committed);
1407
1408         spin_lock(&mdt->mdt_transno_lock);
1409         req->rq_transno = info->mti_transno;
1410         lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
1411
1412         target_committed_to_req(req);
1413
1414         spin_unlock(&mdt->mdt_transno_lock);
1415         lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
1416         //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
1417 }
1418 #endif
1419
1420
1421 /*
1422  * Invoke handler for this request opc. Also do necessary preprocessing
1423  * (according to handler ->mh_flags), and post-processing (setting of
1424  * ->last_{xid,committed}).
1425  */
1426 static int mdt_req_handle(struct mdt_thread_info *info,
1427                           struct mdt_handler *h, struct ptlrpc_request *req)
1428 {
1429         int   rc;
1430         __u32 flags;
1431
1432         ENTRY;
1433
1434         LASSERT(h->mh_act != NULL);
1435         LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1436         LASSERT(current->journal_info == NULL);
1437
1438         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1439
1440         /*
1441          * Do not use *_FAIL_CHECK_ONCE() macros, because they will stop
1442          * correct handling of failed req later in ldlm due to doing
1443          * obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED without actually
1444          * correct actions like it is done in target_send_reply_msg().
1445          */
1446         if (h->mh_fail_id != 0) {
1447                 /* 
1448                  * Set to info->mti_fail_id to handler fail_id, it will be used
1449                  * later, and better than use default fail_id.
1450                  */
1451                 info->mti_fail_id = h->mh_fail_id;
1452                 if (OBD_FAIL_CHECK(h->mh_fail_id))
1453                         RETURN(0);
1454         }
1455
1456         rc = 0;
1457         flags = h->mh_flags;
1458         LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1459
1460         if (h->mh_fmt != NULL) {
1461                 req_capsule_set(&info->mti_pill, h->mh_fmt);
1462                 rc = mdt_unpack_req_pack_rep(info, flags);
1463         }
1464
1465         if (rc == 0 && flags & MUTABOR &&
1466             req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1467                 rc = -EROFS;
1468
1469         if (rc == 0 && flags & HABEO_CLAVIS) {
1470                 struct ldlm_request *dlm_req;
1471
1472                 LASSERT(h->mh_fmt != NULL);
1473
1474                 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1475                 if (dlm_req != NULL) {
1476                         if (info->mti_mdt->mdt_opts.mo_compat_resname)
1477                                 rc = mdt_lock_resname_compat(info->mti_mdt,
1478                                                              dlm_req);
1479                         info->mti_dlm_req = dlm_req;
1480                 } else {
1481                         CERROR("Can't unpack dlm request\n");
1482                         rc = -EFAULT;
1483                 }
1484         }
1485
1486         if (rc == 0)
1487                 /*
1488                  * Process request.
1489                  */
1490                 rc = h->mh_act(info);
1491         /*
1492          * XXX result value is unconditionally shoved into ->rq_status (original
1493          * code sometimes placed error code into ->rq_status, and sometimes
1494          * returned it to the caller). ptlrpc_server_handle_request() doesn't
1495          * check return value anyway.
1496          */
1497         req->rq_status = rc;
1498         rc = 0;
1499         LASSERT(current->journal_info == NULL);
1500
1501         if (flags & HABEO_CLAVIS && info->mti_mdt->mdt_opts.mo_compat_resname) {
1502                 struct ldlm_reply *dlmrep;
1503
1504                 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1505                 if (dlmrep != NULL)
1506                         rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1507         }
1508
1509         /* If we're DISCONNECTing, the mdt_export_data is already freed */
1510
1511         if (rc == 0 && h->mh_opc != MDS_DISCONNECT) {
1512                 target_committed_to_req(req);
1513         }
1514         RETURN(rc);
1515 }
1516
1517 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1518 {
1519         lh->mlh_lh.cookie = 0ull;
1520         lh->mlh_mode = LCK_MINMODE;
1521 }
1522
1523 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1524 {
1525         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1526 }
1527
1528 static void mdt_thread_info_init(struct ptlrpc_request *req,
1529                                  struct mdt_thread_info *info)
1530 {
1531         int i;
1532
1533         memset(info, 0, sizeof(*info));
1534
1535         info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1536         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1537                 info->mti_rep_buf_size[i] = -1;
1538
1539         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1540                 mdt_lock_handle_init(&info->mti_lh[i]);
1541
1542         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1543         info->mti_ctxt = req->rq_svc_thread->t_ctx;
1544         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1545         /* it can be NULL while CONNECT */
1546         if (req->rq_export)
1547                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1548         req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1549                          info->mti_rep_buf_size);
1550 }
1551
1552 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1553 {
1554         int i;
1555
1556         req_capsule_fini(&info->mti_pill);
1557         if (info->mti_object != NULL) {
1558                 mdt_object_put(info->mti_ctxt, info->mti_object);
1559                 info->mti_object = NULL;
1560         }
1561         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1562                 mdt_lock_handle_fini(&info->mti_lh[i]);
1563 }
1564
1565 /* mds/handler.c */
1566 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1567                                        struct obd_device *obd, int *process);
1568 /*
1569  * Handle recovery. Return:
1570  *        +1: continue request processing;
1571  *       -ve: abort immediately with the given error code;
1572  *         0: send reply with error code in req->rq_status;
1573  */
1574 static int mdt_recovery(struct ptlrpc_request *req)
1575 {
1576         int recovering;
1577         int abort_recovery;
1578         struct obd_device *obd;
1579
1580         ENTRY;
1581
1582         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1583         case MDS_CONNECT:
1584         case SEC_CTX_INIT:
1585         case SEC_CTX_INIT_CONT:
1586         case SEC_CTX_FINI:
1587                 RETURN(+1);
1588         }
1589
1590         if (req->rq_export == NULL) {
1591                 CERROR("operation %d on unconnected MDS from %s\n",
1592                        lustre_msg_get_opc(req->rq_reqmsg),
1593                        libcfs_id2str(req->rq_peer));
1594                 req->rq_status = -ENOTCONN;
1595                 RETURN(-ENOTCONN);
1596         }
1597
1598         /* sanity check: if the xid matches, the request must be marked as a
1599          * resent or replayed */
1600         if (req->rq_xid == req_exp_last_xid(req) ||
1601             req->rq_xid == req_exp_last_close_xid(req)) {
1602                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
1603                       (MSG_RESENT | MSG_REPLAY))) {
1604                         CERROR("rq_xid "LPU64" matches last_xid, "
1605                                 "expected RESENT flag\n", req->rq_xid);
1606                         req->rq_status = -ENOTCONN;
1607                         RETURN(-ENOTCONN);
1608                 }
1609         }
1610
1611         /* else: note the opposite is not always true; a RESENT req after a
1612          * failover will usually not match the last_xid, since it was likely
1613          * never committed. A REPLAYed request will almost never match the
1614          * last xid, however it could for a committed, but still retained,
1615          * open. */
1616
1617         obd = req->rq_export->exp_obd;
1618
1619         /* Check for aborted recovery... */
1620         spin_lock_bh(&obd->obd_processing_task_lock);
1621         abort_recovery = obd->obd_abort_recovery;
1622         recovering = obd->obd_recovering;
1623         spin_unlock_bh(&obd->obd_processing_task_lock);
1624         if (abort_recovery) {
1625                 target_abort_recovery(obd);
1626         } else if (recovering) {
1627                 int rc;
1628                 int should_process;
1629
1630                 rc = mds_filter_recovery_request(req, obd, &should_process);
1631                 if (rc != 0 || !should_process) {
1632                         RETURN(rc);
1633                 }
1634         }
1635         RETURN(+1);
1636 }
1637
1638 static int mdt_reply(struct ptlrpc_request *req, int rc,
1639                      struct mdt_thread_info *info)
1640 {
1641         struct obd_device *obd;
1642         ENTRY;
1643
1644         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1645                 if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1646                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1647
1648                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1649                 if (obd && obd->obd_recovering) {
1650                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1651                         RETURN(target_queue_final_reply(req, rc));
1652                 } else {
1653                         /* Lost a race with recovery; let the error path
1654                          * DTRT. */
1655                         rc = req->rq_status = -ENOTCONN;
1656                 }
1657         }
1658         target_send_reply(req, rc, info->mti_fail_id);
1659         RETURN(0);
1660 }
1661
1662 /* mds/handler.c */
1663 extern int mds_msg_check_version(struct lustre_msg *msg);
1664
1665 static int mdt_handle0(struct ptlrpc_request *req,
1666                        struct mdt_thread_info *info,
1667                        struct mdt_opc_slice *supported)
1668 {
1669         struct mdt_handler *h;
1670         struct lustre_msg  *msg;
1671         int                 rc;
1672
1673         ENTRY;
1674
1675         MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1676
1677         LASSERT(current->journal_info == NULL);
1678
1679         msg = req->rq_reqmsg;
1680         rc = mds_msg_check_version(msg);
1681         if (rc == 0) {
1682                 rc = mdt_recovery(req);
1683                 switch (rc) {
1684                 case +1:
1685                         h = mdt_handler_find(lustre_msg_get_opc(msg),
1686                                              supported);
1687                         if (h != NULL)
1688                                 rc = mdt_req_handle(info, h, req);
1689                         else {
1690                                 req->rq_status = -ENOTSUPP;
1691                                 rc = ptlrpc_error(req);
1692                                 break;
1693                         }
1694                         /* fall through */
1695                 case 0:
1696                         rc = mdt_reply(req, rc, info);
1697                 }
1698         } else
1699                 CERROR(LUSTRE_MDT_NAME" drops mal-formed request\n");
1700         RETURN(rc);
1701 }
1702
1703 /*
1704  * MDT handler function called by ptlrpc service thread when request comes.
1705  *
1706  * XXX common "target" functionality should be factored into separate module
1707  * shared by mdt, ost and stand-alone services like fld.
1708  */
1709 static int mdt_handle_common(struct ptlrpc_request *req,
1710                              struct mdt_opc_slice *supported)
1711 {
1712         struct lu_context      *ctx;
1713         struct mdt_thread_info *info;
1714         int                     rc;
1715         ENTRY;
1716
1717         ctx = req->rq_svc_thread->t_ctx;
1718         LASSERT(ctx != NULL);
1719         LASSERT(ctx->lc_thread == req->rq_svc_thread);
1720         info = lu_context_key_get(ctx, &mdt_thread_key);
1721         LASSERT(info != NULL);
1722
1723         mdt_thread_info_init(req, info);
1724
1725         rc = mdt_handle0(req, info, supported);
1726
1727         mdt_thread_info_fini(info);
1728         RETURN(rc);
1729 }
1730
1731 static int mdt_regular_handle(struct ptlrpc_request *req)
1732 {
1733         return mdt_handle_common(req, mdt_regular_handlers);
1734 }
1735
1736 static int mdt_readpage_handle(struct ptlrpc_request *req)
1737 {
1738         return mdt_handle_common(req, mdt_readpage_handlers);
1739 }
1740
1741 static int mdt_mdsc_handle(struct ptlrpc_request *req)
1742 {
1743         return mdt_handle_common(req, mdt_seq_handlers);
1744 }
1745
1746 static int mdt_mdss_handle(struct ptlrpc_request *req)
1747 {
1748         return mdt_handle_common(req, mdt_seq_handlers);
1749 }
1750
1751 static int mdt_dtss_handle(struct ptlrpc_request *req)
1752 {
1753         return mdt_handle_common(req, mdt_seq_handlers);
1754 }
1755
1756 static int mdt_fld_handle(struct ptlrpc_request *req)
1757 {
1758         return mdt_handle_common(req, mdt_fld_handlers);
1759 }
1760
1761 enum mdt_it_code {
1762         MDT_IT_OPEN,
1763         MDT_IT_OCREAT,
1764         MDT_IT_CREATE,
1765         MDT_IT_GETATTR,
1766         MDT_IT_READDIR,
1767         MDT_IT_LOOKUP,
1768         MDT_IT_UNLINK,
1769         MDT_IT_TRUNC,
1770         MDT_IT_GETXATTR,
1771         MDT_IT_NR
1772 };
1773
1774 static int mdt_intent_getattr(enum mdt_it_code opcode,
1775                               struct mdt_thread_info *info,
1776                               struct ldlm_lock **,
1777                               int);
1778 static int mdt_intent_reint(enum mdt_it_code opcode,
1779                             struct mdt_thread_info *info,
1780                             struct ldlm_lock **,
1781                             int);
1782
1783 static struct mdt_it_flavor {
1784         const struct req_format *it_fmt;
1785         __u32                    it_flags;
1786         int                    (*it_act)(enum mdt_it_code ,
1787                                          struct mdt_thread_info *,
1788                                          struct ldlm_lock **,
1789                                          int);
1790         long                     it_reint;
1791 } mdt_it_flavor[] = {
1792         [MDT_IT_OPEN]     = {
1793                 .it_fmt   = &RQF_LDLM_INTENT,
1794                 /*.it_flags = HABEO_REFERO,*/
1795                 .it_flags = 0,
1796                 .it_act   = mdt_intent_reint,
1797                 .it_reint = REINT_OPEN
1798         },
1799         [MDT_IT_OCREAT]   = {
1800                 .it_fmt   = &RQF_LDLM_INTENT,
1801                 .it_flags = MUTABOR,
1802                 .it_act   = mdt_intent_reint,
1803                 .it_reint = REINT_OPEN
1804         },
1805         [MDT_IT_CREATE]   = {
1806                 .it_fmt   = &RQF_LDLM_INTENT,
1807                 .it_flags = MUTABOR,
1808                 .it_act   = mdt_intent_reint,
1809                 .it_reint = REINT_CREATE
1810         },
1811         [MDT_IT_GETATTR]  = {
1812                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1813                 .it_flags = HABEO_REFERO,
1814                 .it_act   = mdt_intent_getattr
1815         },
1816         [MDT_IT_READDIR]  = {
1817                 .it_fmt   = NULL,
1818                 .it_flags = 0,
1819                 .it_act   = NULL
1820         },
1821         [MDT_IT_LOOKUP]   = {
1822                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1823                 .it_flags = HABEO_REFERO,
1824                 .it_act   = mdt_intent_getattr
1825         },
1826         [MDT_IT_UNLINK]   = {
1827                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
1828                 .it_flags = MUTABOR,
1829                 .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
1830                 .it_reint = REINT_UNLINK
1831         },
1832         [MDT_IT_TRUNC]    = {
1833                 .it_fmt   = NULL,
1834                 .it_flags = MUTABOR,
1835                 .it_act   = NULL
1836         },
1837         [MDT_IT_GETXATTR] = {
1838                 .it_fmt   = NULL,
1839                 .it_flags = 0,
1840                 .it_act   = NULL
1841         }
1842 };
1843
1844 int mdt_intent_lock_replace(struct mdt_thread_info *info, 
1845                             struct ldlm_lock **lockp,
1846                             struct ldlm_lock *new_lock,
1847                             struct mdt_lock_handle *lh,
1848                             int flags)
1849 {
1850         struct ptlrpc_request  *req = mdt_info_req(info);
1851         struct ldlm_lock       *lock = *lockp;
1852
1853         /*
1854          * Get new lock only for cases when possible resent did not find any
1855          * lock.
1856          */
1857         if (new_lock == NULL)
1858                 new_lock = ldlm_handle2lock(&lh->mlh_lh);
1859
1860         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1861                 RETURN(0);
1862
1863         LASSERTF(new_lock != NULL,
1864                  "lockh "LPX64"\n", lh->mlh_lh.cookie);
1865
1866         /*
1867          * If we've already given this lock to a client once, then we should
1868          * have no readers or writers.  Otherwise, we should have one reader
1869          * _or_ writer ref (which will be zeroed below) before returning the
1870          * lock to a client.
1871          */
1872         if (new_lock->l_export == req->rq_export) {
1873                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
1874         } else {
1875                 LASSERT(new_lock->l_export == NULL);
1876                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
1877         }
1878
1879         *lockp = new_lock;
1880
1881         if (new_lock->l_export == req->rq_export) {
1882                 /*
1883                  * Already gave this to the client, which means that we
1884                  * reconstructed a reply.
1885                  */
1886                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1887                         MSG_RESENT);
1888                 RETURN(ELDLM_LOCK_REPLACED);
1889         }
1890
1891         /* Fixup the lock to be given to the client */
1892         lock_res_and_lock(new_lock);
1893         new_lock->l_readers = 0;
1894         new_lock->l_writers = 0;
1895
1896         new_lock->l_export = class_export_get(req->rq_export);
1897         list_add(&new_lock->l_export_chain,
1898                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
1899
1900         new_lock->l_blocking_ast = lock->l_blocking_ast;
1901         new_lock->l_completion_ast = lock->l_completion_ast;
1902         new_lock->l_remote_handle = lock->l_remote_handle;
1903         new_lock->l_flags &= ~LDLM_FL_LOCAL;
1904
1905         unlock_res_and_lock(new_lock);
1906         LDLM_LOCK_PUT(new_lock);
1907         lh->mlh_lh.cookie = 0;
1908
1909         RETURN(ELDLM_LOCK_REPLACED);
1910 }
1911
1912 static void mdt_fixup_resent(struct req_capsule *pill,
1913                              struct ldlm_lock *new_lock,
1914                              struct ldlm_lock **old_lock,
1915                              struct mdt_lock_handle *lh)
1916 {
1917         struct ptlrpc_request  *req = pill->rc_req;
1918         struct obd_export      *exp = req->rq_export;
1919         struct lustre_handle    remote_hdl;
1920         struct ldlm_request    *dlmreq;
1921         struct list_head       *iter;
1922
1923         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
1924                 return;
1925
1926         dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
1927         remote_hdl = dlmreq->lock_handle1;
1928         
1929         spin_lock(&exp->exp_ldlm_data.led_lock);
1930         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
1931                 struct ldlm_lock *lock;
1932                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
1933                 if (lock == new_lock)
1934                         continue;
1935                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
1936                         lh->mlh_lh.cookie = lock->l_handle.h_cookie;
1937                         lh->mlh_mode = lock->l_granted_mode;
1938                         
1939                         LDLM_DEBUG(lock, "restoring lock cookie");
1940                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
1941                                   lh->mlh_lh.cookie);
1942                         if (old_lock)
1943                                 *old_lock = LDLM_LOCK_GET(lock);
1944                         spin_unlock(&exp->exp_ldlm_data.led_lock);
1945                         return;
1946                 }
1947         }
1948         spin_unlock(&exp->exp_ldlm_data.led_lock);
1949
1950         /*
1951          * If the xid matches, then we know this is a resent request, and allow
1952          * it. (It's probably an OPEN, for which we don't send a lock.
1953          */
1954         if (req->rq_xid == req_exp_last_xid(req))
1955                 return;
1956
1957         if (req->rq_xid == req_exp_last_close_xid(req))
1958                 return;
1959
1960         /*
1961          * This remote handle isn't enqueued, so we never received or processed
1962          * this request.  Clear MSG_RESENT, because it can be handled like any
1963          * normal request now.
1964          */
1965         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1966
1967         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
1968                   remote_hdl.cookie);
1969 }
1970
1971 static int mdt_intent_getattr(enum mdt_it_code opcode,
1972                               struct mdt_thread_info *info,
1973                               struct ldlm_lock **lockp,
1974                               int flags)
1975 {
1976         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
1977         struct ldlm_lock       *new_lock = NULL;
1978         __u64                   child_bits;
1979         struct ldlm_reply      *ldlm_rep;
1980         struct ptlrpc_request  *req;
1981
1982         ENTRY;
1983
1984         switch (opcode) {
1985         case MDT_IT_LOOKUP:
1986                 child_bits = MDS_INODELOCK_LOOKUP;
1987                 break;
1988         case MDT_IT_GETATTR:
1989                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1990                 break;
1991         default:
1992                 CERROR("Unhandled till now");
1993                 RETURN(-EINVAL);
1994         }
1995
1996         req = info->mti_pill.rc_req;
1997         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1998         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
1999
2000         /* Get lock from request for possible resent case. */
2001         mdt_fixup_resent(&info->mti_pill, *lockp, &new_lock, lhc);
2002         
2003         ldlm_rep->lock_policy_res2 =
2004                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
2005         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
2006
2007         if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
2008                 ldlm_rep->lock_policy_res2 = 0;
2009         if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
2010                     ldlm_rep->lock_policy_res2) {
2011                 RETURN(ELDLM_LOCK_ABORTED);
2012         }
2013
2014         return mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
2015 }
2016
2017 static int mdt_intent_reint(enum mdt_it_code opcode,
2018                             struct mdt_thread_info *info,
2019                             struct ldlm_lock **lockp,
2020                             int flags)
2021 {
2022         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2023         struct ldlm_reply      *rep;
2024         long                    opc;
2025         int                     rc;
2026
2027         static const struct req_format *intent_fmts[REINT_MAX] = {
2028                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
2029                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
2030         };
2031
2032         ENTRY;
2033
2034         opc = mdt_reint_opcode(info, intent_fmts);
2035         if (opc < 0)
2036                 RETURN(opc);
2037
2038         if (mdt_it_flavor[opcode].it_reint != opc) {
2039                 CERROR("Reint code %ld doesn't match intent: %d\n",
2040                        opc, opcode);
2041                 RETURN(-EPROTO);
2042         }
2043
2044         /* Get lock from request for possible resent case. */
2045         mdt_fixup_resent(&info->mti_pill, *lockp, NULL, lhc);
2046         
2047         rc = mdt_reint_internal(info, lhc, opc);
2048
2049         rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2050         if (rep == NULL)
2051                 RETURN(-EFAULT);
2052         
2053         /* MDC expects this in any case */
2054         if (rc != 0)
2055                 mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
2056
2057         rep->lock_policy_res2 = rc;
2058
2059         /* cross-ref case, the lock should be returned to the client */
2060         if (rc == -EREMOTE) {
2061                 LASSERT(lustre_handle_is_used(&lhc->mlh_lh));
2062                 rep->lock_policy_res2 = 0;
2063                 return mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
2064         }
2065         rep->lock_policy_res2 = rc;
2066
2067         RETURN(ELDLM_LOCK_ABORTED);
2068 }
2069
2070 static int mdt_intent_code(long itcode)
2071 {
2072         int rc;
2073
2074         switch(itcode) {
2075         case IT_OPEN:
2076                 rc = MDT_IT_OPEN;
2077                 break;
2078         case IT_OPEN|IT_CREAT:
2079                 rc = MDT_IT_OCREAT;
2080                 break;
2081         case IT_CREAT:
2082                 rc = MDT_IT_CREATE;
2083                 break;
2084         case IT_READDIR:
2085                 rc = MDT_IT_READDIR;
2086                 break;
2087         case IT_GETATTR:
2088                 rc = MDT_IT_GETATTR;
2089                 break;
2090         case IT_LOOKUP:
2091                 rc = MDT_IT_LOOKUP;
2092                 break;
2093         case IT_UNLINK:
2094                 rc = MDT_IT_UNLINK;
2095                 break;
2096         case IT_TRUNC:
2097                 rc = MDT_IT_TRUNC;
2098                 break;
2099         case IT_GETXATTR:
2100                 rc = MDT_IT_GETXATTR;
2101                 break;
2102         default:
2103                 CERROR("Unknown intent opcode: %ld\n", itcode);
2104                 rc = -EINVAL;
2105                 break;
2106         }
2107         return rc;
2108 }
2109
2110 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
2111                           struct ldlm_lock **lockp, int flags)
2112 {
2113         struct req_capsule   *pill;
2114         struct mdt_it_flavor *flv;
2115         int opc;
2116         int rc;
2117         ENTRY;
2118
2119         opc = mdt_intent_code(itopc);
2120         if (opc < 0)
2121                 RETURN(-EINVAL);
2122
2123         pill = &info->mti_pill;
2124         flv  = &mdt_it_flavor[opc];
2125
2126         if (flv->it_fmt != NULL)
2127                 req_capsule_extend(pill, flv->it_fmt);
2128
2129         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
2130         if (rc == 0) {
2131                 struct ptlrpc_request *req = mdt_info_req(info);
2132                 if (flv->it_flags & MUTABOR &&
2133                     req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
2134                         rc = -EROFS;
2135         }
2136         if (rc == 0 && flv->it_act != NULL) {
2137                 /* execute policy */
2138                 rc = flv->it_act(opc, info, lockp, flags);
2139         } else
2140                 rc = -EOPNOTSUPP;
2141         RETURN(rc);
2142 }
2143
2144 static int mdt_intent_policy(struct ldlm_namespace *ns,
2145                              struct ldlm_lock **lockp, void *req_cookie,
2146                              ldlm_mode_t mode, int flags, void *data)
2147 {
2148         struct mdt_thread_info *info;
2149         struct ptlrpc_request  *req  =  req_cookie;
2150         struct ldlm_intent     *it;
2151         struct req_capsule     *pill;
2152         struct ldlm_lock       *lock = *lockp;
2153         int rc;
2154
2155         ENTRY;
2156
2157         LASSERT(req != NULL);
2158
2159         info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
2160         LASSERT(info != NULL);
2161         pill = &info->mti_pill;
2162         LASSERT(pill->rc_req == req);
2163
2164         if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
2165                 req_capsule_extend(pill, &RQF_LDLM_INTENT);
2166                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
2167                 if (it != NULL) {
2168                         LDLM_DEBUG(lock, "intent policy opc: %s\n",
2169                                    ldlm_it2str(it->opc));
2170
2171                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
2172                         if (rc == 0)
2173                                 rc = ELDLM_OK;
2174                 } else
2175                         rc = -EFAULT;
2176         } else {
2177                 /* No intent was provided */
2178                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
2179                 rc = req_capsule_pack(pill);
2180         }
2181         RETURN(rc);
2182 }
2183
2184 /*
2185  * Seq wrappers
2186  */
2187 static int mdt_seq_fini(const struct lu_context *ctx,
2188                         struct mdt_device *m)
2189 {
2190         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2191         ENTRY;
2192
2193         if (ls && ls->ls_server_seq) {
2194                 seq_server_fini(ls->ls_server_seq, ctx);
2195                 OBD_FREE_PTR(ls->ls_server_seq);
2196                 ls->ls_server_seq = NULL;
2197         }
2198         
2199         if (ls && ls->ls_control_seq) {
2200                 seq_server_fini(ls->ls_control_seq, ctx);
2201                 OBD_FREE_PTR(ls->ls_control_seq);
2202                 ls->ls_control_seq = NULL;
2203         }
2204
2205         if (ls && ls->ls_client_seq) {
2206                 seq_client_fini(ls->ls_client_seq);
2207                 OBD_FREE_PTR(ls->ls_client_seq);
2208                 ls->ls_client_seq = NULL;
2209         }
2210
2211         RETURN(0);
2212 }
2213
2214 static int mdt_seq_init(const struct lu_context *ctx,
2215                         const char *uuid,
2216                         struct mdt_device *m)
2217 {
2218         struct lu_site *ls;
2219         char *prefix;
2220         int rc;
2221         ENTRY;
2222
2223         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2224
2225         /*
2226          * This is sequence-controller node. Init seq-controller server on local
2227          * MDT.
2228          */
2229         if (ls->ls_node_id == 0) {
2230                 LASSERT(ls->ls_control_seq == NULL);
2231
2232                 OBD_ALLOC_PTR(ls->ls_control_seq);
2233                 if (ls->ls_control_seq == NULL)
2234                         RETURN(-ENOMEM);
2235
2236                 rc = seq_server_init(ls->ls_control_seq,
2237                                      m->mdt_bottom, uuid,
2238                                      LUSTRE_SEQ_CONTROLLER,
2239                                      ctx);
2240
2241                 if (rc)
2242                         GOTO(out_seq_fini, rc);
2243                 
2244                 OBD_ALLOC_PTR(ls->ls_client_seq);
2245                 if (ls->ls_client_seq == NULL)
2246                         GOTO(out_seq_fini, rc = -ENOMEM);
2247
2248                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2249                 if (prefix == NULL) {
2250                         OBD_FREE_PTR(ls->ls_client_seq);
2251                         GOTO(out_seq_fini, rc = -ENOMEM);
2252                 }
2253
2254                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2255                          uuid);
2256
2257                 /*
2258                  * Init seq-controller client after seq-controller server is
2259                  * ready. Pass ls->ls_control_seq to it for direct talking.
2260                  */
2261                 rc = seq_client_init(ls->ls_client_seq, NULL,
2262                                      LUSTRE_SEQ_METADATA, prefix,
2263                                      ls->ls_control_seq);
2264                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2265
2266                 if (rc)
2267                         GOTO(out_seq_fini, rc);
2268         }
2269
2270         /* Init seq-server on local MDT */
2271         LASSERT(ls->ls_server_seq == NULL);
2272         
2273         OBD_ALLOC_PTR(ls->ls_server_seq);
2274         if (ls->ls_server_seq == NULL)
2275                 GOTO(out_seq_fini, rc = -ENOMEM);
2276
2277         rc = seq_server_init(ls->ls_server_seq,
2278                              m->mdt_bottom, uuid,
2279                              LUSTRE_SEQ_SERVER,
2280                              ctx);
2281         if (rc)
2282                 GOTO(out_seq_fini, rc = -ENOMEM);
2283
2284         /* Assign seq-controller client to local seq-server. */
2285         if (ls->ls_node_id == 0) {
2286                 LASSERT(ls->ls_client_seq != NULL);
2287                 
2288                 rc = seq_server_set_cli(ls->ls_server_seq,
2289                                         ls->ls_client_seq,
2290                                         ctx);
2291         }
2292         
2293         EXIT;
2294 out_seq_fini:
2295         if (rc)
2296                 mdt_seq_fini(ctx, m);
2297
2298         return rc;
2299 }
2300
2301 /*
2302  * Init client sequence manager which is used by local MDS to talk to sequence
2303  * controller on remote node.
2304  */
2305 static int mdt_seq_init_cli(const struct lu_context *ctx,
2306                             struct mdt_device *m,
2307                             struct lustre_cfg *cfg)
2308 {
2309         struct lu_site    *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2310         struct obd_device *mdc;
2311         struct obd_uuid   *uuidp, *mdcuuidp;
2312         char              *uuid_str, *mdc_uuid_str;
2313         int               rc;
2314         int               index;
2315         struct mdt_thread_info *info;
2316         char *p, *index_string = lustre_cfg_string(cfg, 2);
2317         ENTRY;
2318
2319         info = lu_context_key_get(ctx, &mdt_thread_key);
2320         uuidp = &info->mti_u.uuid[0];
2321         mdcuuidp = &info->mti_u.uuid[1];
2322
2323         LASSERT(index_string);
2324
2325         index = simple_strtol(index_string, &p, 10);
2326         if (*p) {
2327                 CERROR("Invalid index in lustre_cgf, offset 2\n");
2328                 RETURN(-EINVAL);
2329         }
2330
2331         /* check if this is adding the first MDC and controller is not yet
2332          * initialized. */
2333         if (index != 0 || ls->ls_client_seq)
2334                 RETURN(0);
2335
2336         uuid_str = lustre_cfg_string(cfg, 1);
2337         mdc_uuid_str = lustre_cfg_string(cfg, 4);
2338         obd_str2uuid(uuidp, uuid_str);
2339         obd_str2uuid(mdcuuidp, mdc_uuid_str);
2340
2341         mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
2342         if (!mdc) {
2343                 CERROR("can't find controller MDC by uuid %s\n",
2344                        uuid_str);
2345                 rc = -ENOENT;
2346         } else if (!mdc->obd_set_up) {
2347                 CERROR("target %s not set up\n", mdc->obd_name);
2348                 rc = -EINVAL;
2349         } else {
2350                 struct lustre_handle conn = {0, };
2351
2352                 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
2353                        mdc->obd_name, mdc->obd_uuid.uuid);
2354
2355                 rc = obd_connect(ctx, &conn, mdc, &mdc->obd_uuid, NULL);
2356
2357                 if (rc) {
2358                         CERROR("target %s connect error %d\n",
2359                                mdc->obd_name, rc);
2360                 } else {
2361                         ls->ls_control_exp = class_conn2export(&conn);
2362
2363                         OBD_ALLOC_PTR(ls->ls_client_seq);
2364
2365                         if (ls->ls_client_seq != NULL) {
2366                                 char *prefix;
2367
2368                                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2369                                 if (!prefix)
2370                                         RETURN(-ENOMEM);
2371
2372                                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2373                                          mdc->obd_name);
2374
2375                                 rc = seq_client_init(ls->ls_client_seq,
2376                                                      ls->ls_control_exp,
2377                                                      LUSTRE_SEQ_METADATA,
2378                                                      prefix, NULL);
2379                                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2380                         } else
2381                                 rc = -ENOMEM;
2382
2383                         if (rc)
2384                                 RETURN(rc);
2385
2386                         LASSERT(ls->ls_server_seq != NULL);
2387
2388                         rc = seq_server_set_cli(ls->ls_server_seq,
2389                                                 ls->ls_client_seq,
2390                                                 ctx);
2391                 }
2392         }
2393
2394         RETURN(rc);
2395 }
2396
2397 static void mdt_seq_fini_cli(struct mdt_device *m)
2398 {
2399         struct lu_site *ls;
2400         int rc;
2401
2402         ENTRY;
2403
2404         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2405
2406         if (ls && ls->ls_server_seq)
2407                 seq_server_set_cli(ls->ls_server_seq,
2408                                    NULL, NULL);
2409
2410         if (ls && ls->ls_control_exp) {
2411                 rc = obd_disconnect(ls->ls_control_exp);
2412                 if (rc) {
2413                         CERROR("failure to disconnect "
2414                                "obd: %d\n", rc);
2415                 }
2416                 ls->ls_control_exp = NULL;
2417         }
2418         EXIT;
2419 }
2420
2421 /*
2422  * FLD wrappers
2423  */
2424 static int mdt_fld_fini(const struct lu_context *ctx,
2425                         struct mdt_device *m)
2426 {
2427         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2428         ENTRY;
2429
2430         if (ls && ls->ls_server_fld) {
2431                 fld_server_fini(ls->ls_server_fld, ctx);
2432                 OBD_FREE_PTR(ls->ls_server_fld);
2433                 ls->ls_server_fld = NULL;
2434         }
2435
2436         if (ls && ls->ls_client_fld != NULL) {
2437                 fld_client_fini(ls->ls_client_fld);
2438                 OBD_FREE_PTR(ls->ls_client_fld);
2439                 ls->ls_client_fld = NULL;
2440         }
2441
2442         RETURN(0);
2443 }
2444
2445 static int mdt_fld_init(const struct lu_context *ctx,
2446                         const char *uuid,
2447                         struct mdt_device *m)
2448 {
2449         struct lu_fld_target target;
2450         struct lu_site *ls;
2451         int rc;
2452         ENTRY;
2453
2454         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2455
2456         OBD_ALLOC_PTR(ls->ls_server_fld);
2457         if (ls->ls_server_fld == NULL)
2458                 RETURN(rc = -ENOMEM);
2459
2460         rc = fld_server_init(ls->ls_server_fld,
2461                              m->mdt_bottom, uuid, ctx);
2462         if (rc) {
2463                 OBD_FREE_PTR(ls->ls_server_fld);
2464                 ls->ls_server_fld = NULL;
2465         }
2466
2467         OBD_ALLOC_PTR(ls->ls_client_fld);
2468         if (!ls->ls_client_fld)
2469                 GOTO(out_fld_fini, rc = -ENOMEM);
2470
2471         rc = fld_client_init(ls->ls_client_fld, uuid,
2472                              LUSTRE_CLI_FLD_HASH_DHT);
2473         if (rc) {
2474                 CERROR("can't init FLD, err %d\n",  rc);        
2475                 OBD_FREE_PTR(ls->ls_client_fld);
2476                 GOTO(out_fld_fini, rc);
2477         }
2478
2479         target.ft_srv = ls->ls_server_fld;
2480         target.ft_idx = ls->ls_node_id;
2481         target.ft_exp = NULL;
2482         
2483         fld_client_add_target(ls->ls_client_fld, &target);
2484         EXIT;
2485 out_fld_fini:
2486         if (rc)
2487                 mdt_fld_fini(ctx, m);
2488         return rc;
2489 }
2490
2491 /* device init/fini methods */
2492 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2493 {
2494         if (m->mdt_regular_service != NULL) {
2495                 ptlrpc_unregister_service(m->mdt_regular_service);
2496                 m->mdt_regular_service = NULL;
2497         }
2498         if (m->mdt_readpage_service != NULL) {
2499                 ptlrpc_unregister_service(m->mdt_readpage_service);
2500                 m->mdt_readpage_service = NULL;
2501         }
2502         if (m->mdt_setattr_service != NULL) {
2503                 ptlrpc_unregister_service(m->mdt_setattr_service);
2504                 m->mdt_setattr_service = NULL;
2505         }
2506         if (m->mdt_mdsc_service != NULL) {
2507                 ptlrpc_unregister_service(m->mdt_mdsc_service);
2508                 m->mdt_mdsc_service = NULL;
2509         }
2510         if (m->mdt_mdss_service != NULL) {
2511                 ptlrpc_unregister_service(m->mdt_mdss_service);
2512                 m->mdt_mdss_service = NULL;
2513         }
2514         if (m->mdt_dtss_service != NULL) {
2515                 ptlrpc_unregister_service(m->mdt_dtss_service);
2516                 m->mdt_dtss_service = NULL;
2517         }
2518         if (m->mdt_fld_service != NULL) {
2519                 ptlrpc_unregister_service(m->mdt_fld_service);
2520                 m->mdt_fld_service = NULL;
2521         }
2522 }
2523
2524 static int mdt_start_ptlrpc_service(struct mdt_device *m)
2525 {
2526         int rc;
2527         static struct ptlrpc_service_conf conf;
2528         ENTRY;
2529
2530         conf = (typeof(conf)) {
2531                 .psc_nbufs            = MDS_NBUFS,
2532                 .psc_bufsize          = MDS_BUFSIZE,
2533                 .psc_max_req_size     = MDS_MAXREQSIZE,
2534                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2535                 .psc_req_portal       = MDS_REQUEST_PORTAL,
2536                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2537                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2538                 /*
2539                  * We'd like to have a mechanism to set this on a per-device
2540                  * basis, but alas...
2541                  */
2542                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2543                                        MDT_MAX_THREADS),
2544                 .psc_ctx_tags      = LCT_MD_THREAD
2545         };
2546
2547         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2548         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2549                            "mdt_ldlm_client", m->mdt_ldlm_client);
2550
2551         m->mdt_regular_service =
2552                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
2553                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2554                                      NULL);
2555         if (m->mdt_regular_service == NULL)
2556                 RETURN(-ENOMEM);
2557
2558         rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
2559         if (rc)
2560                 GOTO(err_mdt_svc, rc);
2561
2562         /*
2563          * readpage service configuration. Parameters have to be adjusted,
2564          * ideally.
2565          */
2566         conf = (typeof(conf)) {
2567                 .psc_nbufs            = MDS_NBUFS,
2568                 .psc_bufsize          = MDS_BUFSIZE,
2569                 .psc_max_req_size     = MDS_MAXREQSIZE,
2570                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2571                 .psc_req_portal       = MDS_READPAGE_PORTAL,
2572                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2573                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2574                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2575                                        MDT_MAX_THREADS),
2576                 .psc_ctx_tags      = LCT_MD_THREAD
2577         };
2578         m->mdt_readpage_service =
2579                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2580                                      LUSTRE_MDT_NAME "_readpage",
2581                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2582                                      NULL);
2583
2584         if (m->mdt_readpage_service == NULL) {
2585                 CERROR("failed to start readpage service\n");
2586                 GOTO(err_mdt_svc, rc = -ENOMEM);
2587         }
2588
2589         rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2590
2591         /*
2592          * setattr service configuration.
2593          */
2594         conf = (typeof(conf)) {
2595                 .psc_nbufs            = MDS_NBUFS,
2596                 .psc_bufsize          = MDS_BUFSIZE,
2597                 .psc_max_req_size     = MDS_MAXREQSIZE,
2598                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2599                 .psc_req_portal       = MDS_SETATTR_PORTAL,
2600                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2601                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2602                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2603                                        MDT_MAX_THREADS),
2604                 .psc_ctx_tags      = LCT_MD_THREAD
2605         };
2606
2607         m->mdt_setattr_service =
2608                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
2609                                      LUSTRE_MDT_NAME "_setattr",
2610                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2611                                      NULL);
2612
2613         if (!m->mdt_setattr_service) {
2614                 CERROR("failed to start setattr service\n");
2615                 GOTO(err_mdt_svc, rc = -ENOMEM);
2616         }
2617
2618         rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2619         if (rc)
2620                 GOTO(err_mdt_svc, rc);
2621
2622         /*
2623          * sequence controller service configuration
2624          */
2625         conf = (typeof(conf)) {
2626                 .psc_nbufs = MDS_NBUFS,
2627                 .psc_bufsize = MDS_BUFSIZE,
2628                 .psc_max_req_size = SEQ_MAXREQSIZE,
2629                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2630                 .psc_req_portal = SEQ_CONTROLLER_PORTAL,
2631                 .psc_rep_portal = MDC_REPLY_PORTAL,
2632                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2633                 .psc_num_threads = SEQ_NUM_THREADS,
2634                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2635         };
2636
2637         m->mdt_mdsc_service =
2638                 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
2639                                      LUSTRE_MDT_NAME"_mdsc",
2640                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2641                                      NULL);
2642         if (!m->mdt_mdsc_service) {
2643                 CERROR("failed to start seq controller service\n");
2644                 GOTO(err_mdt_svc, rc = -ENOMEM);
2645         }
2646
2647         rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
2648         if (rc)
2649                 GOTO(err_mdt_svc, rc);
2650
2651         /*
2652          * metadata sequence server service configuration
2653          */
2654         conf = (typeof(conf)) {
2655                 .psc_nbufs = MDS_NBUFS,
2656                 .psc_bufsize = MDS_BUFSIZE,
2657                 .psc_max_req_size = SEQ_MAXREQSIZE,
2658                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2659                 .psc_req_portal = SEQ_METADATA_PORTAL,
2660                 .psc_rep_portal = MDC_REPLY_PORTAL,
2661                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2662                 .psc_num_threads = SEQ_NUM_THREADS,
2663                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2664         };
2665
2666         m->mdt_mdss_service =
2667                 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
2668                                      LUSTRE_MDT_NAME"_mdss",
2669                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2670                                      NULL);
2671         if (!m->mdt_mdss_service) {
2672                 CERROR("failed to start metadata seq server service\n");
2673                 GOTO(err_mdt_svc, rc = -ENOMEM);
2674         }
2675
2676         rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
2677         if (rc)
2678                 GOTO(err_mdt_svc, rc);
2679
2680
2681         /*
2682          * Data sequence server service configuration. We want to have really
2683          * cluster-wide sequences space. This is why we start only one sequence
2684          * controller which manages space.
2685          */
2686         conf = (typeof(conf)) {
2687                 .psc_nbufs = MDS_NBUFS,
2688                 .psc_bufsize = MDS_BUFSIZE,
2689                 .psc_max_req_size = SEQ_MAXREQSIZE,
2690                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2691                 .psc_req_portal = SEQ_DATA_PORTAL,
2692                 .psc_rep_portal = OSC_REPLY_PORTAL,
2693                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2694                 .psc_num_threads = SEQ_NUM_THREADS,
2695                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2696         };
2697
2698         m->mdt_dtss_service =
2699                 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
2700                                      LUSTRE_MDT_NAME"_dtss",
2701                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2702                                      NULL);
2703         if (!m->mdt_dtss_service) {
2704                 CERROR("failed to start data seq server service\n");
2705                 GOTO(err_mdt_svc, rc = -ENOMEM);
2706         }
2707
2708         rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
2709         if (rc)
2710                 GOTO(err_mdt_svc, rc);
2711
2712         /* FLD service start */
2713         conf = (typeof(conf)) {
2714                 .psc_nbufs            = MDS_NBUFS,
2715                 .psc_bufsize          = MDS_BUFSIZE,
2716                 .psc_max_req_size     = FLD_MAXREQSIZE,
2717                 .psc_max_reply_size   = FLD_MAXREPSIZE,
2718                 .psc_req_portal       = FLD_REQUEST_PORTAL,
2719                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2720                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2721                 .psc_num_threads      = FLD_NUM_THREADS,
2722                 .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
2723         };
2724
2725         m->mdt_fld_service =
2726                 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
2727                                      LUSTRE_MDT_NAME"_fld",
2728                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2729                                      NULL);
2730         if (!m->mdt_fld_service) {
2731                 CERROR("failed to start fld service\n");
2732                 GOTO(err_mdt_svc, rc = -ENOMEM);
2733         }
2734
2735         rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
2736         if (rc)
2737                 GOTO(err_mdt_svc, rc);
2738
2739         EXIT;
2740 err_mdt_svc:
2741         if (rc)
2742                 mdt_stop_ptlrpc_service(m);
2743
2744         return rc;
2745 }
2746
2747 static void mdt_stack_fini(const struct lu_context *ctx,
2748                            struct mdt_device *m, struct lu_device *top)
2749 {
2750         struct lu_device        *d = top, *n;
2751         struct lustre_cfg_bufs  *bufs;
2752         struct lustre_cfg       *lcfg;
2753         struct mdt_thread_info  *info;
2754         ENTRY;
2755
2756         info = lu_context_key_get(ctx, &mdt_thread_key);
2757         LASSERT(info != NULL);
2758
2759         bufs = &info->mti_u.bufs;
2760         /* process cleanup */
2761         lustre_cfg_bufs_reset(bufs, NULL);
2762         lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
2763         if (!lcfg) {
2764                 CERROR("Cannot alloc lcfg!\n");
2765                 return;
2766         }
2767         LASSERT(top);
2768         top->ld_ops->ldo_process_config(ctx, top, lcfg);
2769         lustre_cfg_free(lcfg);
2770
2771         lu_site_purge(ctx, top->ld_site, ~0);
2772         while (d != NULL) {
2773                 struct obd_type *type;
2774                 struct lu_device_type *ldt = d->ld_type;
2775
2776                 /* each fini() returns next device in stack of layers
2777                  * * so we can avoid the recursion */
2778                 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
2779                 lu_device_put(d);
2780                 ldt->ldt_ops->ldto_device_free(ctx, d);
2781                 type = ldt->ldt_obd_type;
2782                 type->typ_refcnt--;
2783                 class_put_type(type);
2784                 
2785                 /* switch to the next device in the layer */
2786                 d = n;
2787         }
2788         m->mdt_child = NULL;
2789 }
2790
2791 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
2792                                          const char *typename,
2793                                          struct lu_device *child,
2794                                          struct lustre_cfg *cfg)
2795 {
2796         struct obd_type       *type;
2797         struct lu_device_type *ldt;
2798         struct lu_device      *d;
2799         int rc;
2800         ENTRY;
2801         
2802         /* find the type */
2803         type = class_get_type(typename);
2804         if (!type) {
2805                 CERROR("Unknown type: '%s'\n", typename);
2806                 GOTO(out, rc = -ENODEV);
2807         }
2808
2809         rc = lu_context_refill(ctx);
2810         if (rc != 0) {
2811                 CERROR("Failure to refill context: '%d'\n", rc);
2812                 GOTO(out_type, rc);
2813         }
2814
2815         ldt = type->typ_lu;
2816         if (ldt == NULL) {
2817                 CERROR("type: '%s'\n", typename);
2818                 GOTO(out_type, rc = -EINVAL);
2819         }
2820
2821         ldt->ldt_obd_type = type;
2822         d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
2823         if (IS_ERR(d)) {
2824                 CERROR("Cannot allocate device: '%s'\n", typename);
2825                 GOTO(out_type, rc = -ENODEV);
2826         }
2827
2828         LASSERT(child->ld_site);
2829         d->ld_site = child->ld_site;
2830
2831         type->typ_refcnt++;
2832         rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
2833         if (rc) {
2834                 CERROR("can't init device '%s', rc %d\n", typename, rc);
2835                 GOTO(out_alloc, rc);
2836         }
2837         lu_device_get(d);
2838
2839         RETURN(d);
2840
2841 out_alloc:
2842         ldt->ldt_ops->ldto_device_free(ctx, d);
2843         type->typ_refcnt--;
2844 out_type:
2845         class_put_type(type);
2846 out:
2847         return ERR_PTR(rc);
2848 }
2849
2850 static int mdt_stack_init(const struct lu_context *ctx, 
2851                           struct mdt_device *m, struct lustre_cfg *cfg)
2852 {
2853         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
2854         struct lu_device  *tmp;
2855         struct md_device  *md;
2856         int rc;
2857         ENTRY;
2858
2859         /* init the stack */
2860         tmp = mdt_layer_setup(ctx, LUSTRE_OSD_NAME, d, cfg);
2861         if (IS_ERR(tmp)) {
2862                 RETURN(PTR_ERR(tmp));
2863         }
2864         m->mdt_bottom = lu2dt_dev(tmp);
2865         d = tmp;
2866         tmp = mdt_layer_setup(ctx, LUSTRE_MDD_NAME, d, cfg);
2867         if (IS_ERR(tmp)) {
2868                 GOTO(out, rc = PTR_ERR(tmp));
2869         }
2870         d = tmp;
2871         md = lu2md_dev(d);
2872
2873         tmp = mdt_layer_setup(ctx, LUSTRE_CMM_NAME, d, cfg);
2874         if (IS_ERR(tmp)) {
2875                 GOTO(out, rc = PTR_ERR(tmp));
2876         }
2877         d = tmp;
2878         /*set mdd upcall device*/
2879         md->md_upcall.mu_upcall_dev = lu2md_dev(d);
2880
2881         md = lu2md_dev(d);
2882         /*set cmm upcall device*/
2883         md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
2884
2885         m->mdt_child = lu2md_dev(d);
2886
2887         /* process setup config */
2888         tmp = &m->mdt_md_dev.md_lu_dev;
2889         rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
2890         GOTO(out, rc);
2891 out:
2892         /* fini from last known good lu_device */
2893         if (rc)
2894                 mdt_stack_fini(ctx, m, d);
2895
2896         return rc;
2897 }
2898
2899 static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
2900 {
2901         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
2902         struct lu_site    *ls = d->ld_site;
2903
2904         ENTRY;
2905         target_cleanup_recovery(m->mdt_md_dev.md_lu_dev.ld_obd);
2906         ping_evictor_stop();
2907         mdt_stop_ptlrpc_service(m);
2908
2909         if (m->mdt_namespace != NULL) {
2910                 ldlm_namespace_free(m->mdt_namespace, 0);
2911                 m->mdt_namespace = NULL;
2912         }
2913
2914         mdt_seq_fini(ctx, m);
2915         mdt_seq_fini_cli(m);
2916         mdt_fld_fini(ctx, m);
2917
2918         mdt_fs_cleanup(ctx, m);
2919
2920         /* finish the stack */
2921         mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2922
2923         if (ls) {
2924                 lu_site_fini(ls);
2925                 OBD_FREE_PTR(ls);
2926                 d->ld_site = NULL;
2927         }
2928         LASSERT(atomic_read(&d->ld_ref) == 0);
2929         md_device_fini(&m->mdt_md_dev);
2930
2931         EXIT;
2932 }
2933
2934 int mdt_postrecov(const struct lu_context *, struct mdt_device *);
2935
2936 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
2937                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
2938 {
2939         struct lprocfs_static_vars lvars;
2940         struct mdt_thread_info    *info;
2941         struct obd_device         *obd;
2942         const char                *dev = lustre_cfg_string(cfg, 0);
2943         const char                *num = lustre_cfg_string(cfg, 2);
2944         struct lu_site            *s;
2945         int                        rc;
2946         ENTRY;
2947
2948         info = lu_context_key_get(ctx, &mdt_thread_key);
2949         LASSERT(info != NULL);
2950
2951         obd = class_name2obd(dev);
2952         LASSERT(obd);
2953
2954         spin_lock_init(&m->mdt_transno_lock);
2955         
2956         m->mdt_max_mdsize = MAX_MD_SIZE;
2957         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
2958
2959         spin_lock_init(&m->mdt_ioepoch_lock);
2960         /* Temporary. should parse mount option. */
2961         m->mdt_opts.mo_user_xattr = 0;
2962         m->mdt_opts.mo_acl = 0;
2963         m->mdt_opts.mo_compat_resname = 0;
2964         obd->obd_replayable = 1;
2965         spin_lock_init(&m->mdt_client_bitmap_lock);
2966
2967         OBD_ALLOC_PTR(s);
2968         if (s == NULL)
2969                 RETURN(-ENOMEM);
2970
2971         md_device_init(&m->mdt_md_dev, ldt);
2972         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2973         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2974         /* set this lu_device to obd, because error handling need it */
2975         obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
2976
2977         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2978         if (rc) {
2979                 CERROR("can't init lu_site, rc %d\n", rc);
2980                 GOTO(err_free_site, rc);
2981         }
2982
2983         lprocfs_init_vars(mdt, &lvars);
2984         rc = lprocfs_obd_setup(obd, lvars.obd_vars);
2985         if (rc) {
2986                 CERROR("can't init lprocfs, rc %d\n", rc);
2987                 GOTO(err_fini_site, rc);
2988         }
2989         
2990         /* init the stack */
2991         rc = mdt_stack_init(ctx, m, cfg);
2992         if (rc) {
2993                 CERROR("can't init device stack, rc %d\n", rc);
2994                 GOTO(err_fini_site, rc);
2995         }
2996
2997         /* set server index */
2998         LASSERT(num);
2999         s->ls_node_id = simple_strtol(num, NULL, 10);
3000
3001         rc = mdt_fld_init(ctx, obd->obd_name, m);
3002         if (rc)
3003                 GOTO(err_fini_stack, rc);
3004
3005         rc = mdt_seq_init(ctx, obd->obd_name, m);
3006         if (rc)
3007                 GOTO(err_fini_fld, rc);
3008
3009         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
3010                  LUSTRE_MDT_NAME"-%p", m);
3011         m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
3012                                               LDLM_NAMESPACE_SERVER);
3013         if (m->mdt_namespace == NULL)
3014                 GOTO(err_fini_seq, rc = -ENOMEM);
3015
3016         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
3017
3018         rc = mdt_start_ptlrpc_service(m);
3019         if (rc)
3020                 GOTO(err_free_ns, rc);
3021
3022         ping_evictor_start();
3023         rc = mdt_fs_setup(ctx, m);
3024         if (rc)
3025                 GOTO(err_stop_service, rc);
3026         if(obd->obd_recovering == 0)
3027                 mdt_postrecov(ctx, m);
3028         RETURN(0);
3029
3030 err_stop_service:
3031         mdt_stop_ptlrpc_service(m);
3032 err_free_ns:
3033         ldlm_namespace_free(m->mdt_namespace, 0);
3034         m->mdt_namespace = NULL;
3035 err_fini_seq:
3036         mdt_seq_fini(ctx, m);
3037 err_fini_fld:
3038         mdt_fld_fini(ctx, m);
3039 err_fini_stack:
3040         mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
3041 err_fini_site:
3042         lu_site_fini(s);
3043 err_free_site:
3044         OBD_FREE_PTR(s);
3045
3046         md_device_fini(&m->mdt_md_dev);
3047         return (rc);
3048 }
3049
3050 /* used by MGS to process specific configurations */
3051 static int mdt_process_config(const struct lu_context *ctx,
3052                               struct lu_device *d, struct lustre_cfg *cfg)
3053 {
3054         struct mdt_device *m = mdt_dev(d);
3055         struct md_device *md_next  = m->mdt_child;
3056         struct lu_device *next = md2lu_dev(md_next);
3057         int err;
3058         ENTRY;
3059
3060         switch (cfg->lcfg_command) {
3061         case LCFG_ADD_MDC:
3062                 /*
3063                  * Add mdc hook to get first MDT uuid and connect it to
3064                  * ls->controller to use for seq manager.
3065                  */
3066                 err = mdt_seq_init_cli(ctx, mdt_dev(d), cfg);
3067                 if (err) {
3068                         CERROR("can't initialize controller export, "
3069                                "rc %d\n", err);
3070                 }
3071         default:
3072                 /* others are passed further */
3073                 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
3074                 break;
3075         }
3076         RETURN(err);
3077 }
3078
3079 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
3080                                           const struct lu_object_header *hdr,
3081                                           struct lu_device *d)
3082 {
3083         struct mdt_object *mo;
3084
3085         ENTRY;
3086
3087         OBD_ALLOC_PTR(mo);
3088         if (mo != NULL) {
3089                 struct lu_object *o;
3090                 struct lu_object_header *h;
3091
3092                 o = &mo->mot_obj.mo_lu;
3093                 h = &mo->mot_header;
3094                 lu_object_header_init(h);
3095                 lu_object_init(o, h, d);
3096                 lu_object_add_top(h, o);
3097                 o->lo_ops = &mdt_obj_ops;
3098                 RETURN(o);
3099         } else
3100                 RETURN(NULL);
3101 }
3102
3103 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
3104 {
3105         struct mdt_device *d = mdt_dev(o->lo_dev);
3106         struct lu_device  *under;
3107         struct lu_object  *below;
3108         int                rc = 0;
3109         ENTRY;
3110
3111         CDEBUG(D_INFO, "object init, fid = "DFID"\n",
3112                PFID(lu_object_fid(o)));
3113
3114         under = &d->mdt_child->md_lu_dev;
3115         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
3116         if (below != NULL) {
3117                 lu_object_add(o, below);
3118         } else
3119                 rc = -ENOMEM;
3120         RETURN(rc);
3121 }
3122
3123 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
3124 {
3125         struct mdt_object *mo = mdt_obj(o);
3126         struct lu_object_header *h;
3127         ENTRY;
3128
3129         h = o->lo_header;
3130         CDEBUG(D_INFO, "object free, fid = "DFID"\n",
3131                PFID(lu_object_fid(o)));
3132
3133         lu_object_fini(o);
3134         lu_object_header_fini(h);
3135         OBD_FREE_PTR(mo);
3136         EXIT;
3137 }
3138
3139 static int mdt_object_print(const struct lu_context *ctxt, void *cookie,
3140                             lu_printer_t p, const struct lu_object *o)
3141 {
3142         return (*p)(ctxt, cookie, LUSTRE_MDT_NAME"-object@%p", o);
3143 }
3144
3145 static struct lu_device_operations mdt_lu_ops = {
3146         .ldo_object_alloc   = mdt_object_alloc,
3147         .ldo_process_config = mdt_process_config
3148 };
3149
3150 static struct lu_object_operations mdt_obj_ops = {
3151         .loo_object_init    = mdt_object_init,
3152         .loo_object_free    = mdt_object_free,
3153         .loo_object_print   = mdt_object_print
3154 };
3155
3156 /* mds_connect_internal */
3157 static int mdt_connect_internal(struct obd_export *exp,
3158                                 struct mdt_device *mdt,
3159                                 struct obd_connect_data *data)
3160 {
3161         if (data != NULL) {
3162                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
3163                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
3164
3165                 /* If no known bits (which should not happen, probably,
3166                    as everybody should support LOOKUP and UPDATE bits at least)
3167                    revert to compat mode with plain locks. */
3168                 if (!data->ocd_ibits_known &&
3169                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
3170                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
3171
3172                 if (!mdt->mdt_opts.mo_acl)
3173                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
3174
3175                 if (!mdt->mdt_opts.mo_user_xattr)
3176                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
3177
3178                 exp->exp_connect_flags = data->ocd_connect_flags;
3179                 data->ocd_version = LUSTRE_VERSION_CODE;
3180                 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
3181         }
3182
3183         if (mdt->mdt_opts.mo_acl &&
3184             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
3185                 CWARN("%s: MDS requires ACL support but client does not\n",
3186                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3187                 return -EBADE;
3188         }
3189         return 0;
3190 }
3191
3192 /* mds_connect copy */
3193 static int mdt_obd_connect(const struct lu_context *ctx,
3194                            struct lustre_handle *conn, struct obd_device *obd,
3195                            struct obd_uuid *cluuid,
3196                            struct obd_connect_data *data)
3197 {
3198         struct mdt_export_data *med;
3199         struct mdt_client_data *mcd;
3200         struct obd_export      *exp;
3201         struct mdt_device      *mdt;
3202         int                     rc;
3203         ENTRY;
3204
3205         LASSERT(ctx != NULL);
3206         if (!conn || !obd || !cluuid)
3207                 RETURN(-EINVAL);
3208
3209         mdt = mdt_dev(obd->obd_lu_dev);
3210
3211         rc = class_connect(conn, obd, cluuid);
3212         if (rc)
3213                 RETURN(rc);
3214
3215         exp = class_conn2export(conn);
3216         LASSERT(exp != NULL);
3217         med = &exp->exp_mdt_data;
3218         
3219         rc = mdt_connect_internal(exp, mdt, data);
3220         if (rc == 0) {
3221                 OBD_ALLOC_PTR(mcd);
3222                 if (mcd != NULL) {
3223                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
3224                         med->med_mcd = mcd;
3225                         rc = mdt_client_new(ctx, mdt, med);
3226                         if (rc != 0)
3227                                 OBD_FREE_PTR(mcd);
3228                 } else
3229                         rc = -ENOMEM;
3230         }
3231
3232         if (rc != 0)
3233                 class_disconnect(exp);
3234         else
3235                 class_export_put(exp);
3236
3237         RETURN(rc);
3238 }
3239
3240 static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
3241                              struct obd_uuid *cluuid,
3242                              struct obd_connect_data *data)
3243 {
3244         int rc;
3245         ENTRY;
3246
3247         if (exp == NULL || obd == NULL || cluuid == NULL)
3248                 RETURN(-EINVAL);
3249
3250         rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
3251
3252         RETURN(rc);
3253 }
3254
3255 static int mdt_obd_disconnect(struct obd_export *exp)
3256 {
3257         struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
3258         int rc;
3259         ENTRY;
3260
3261         LASSERT(exp);
3262         class_export_get(exp);
3263
3264         /* Disconnect early so that clients can't keep using export */
3265         rc = class_disconnect(exp);
3266         if (mdt->mdt_namespace != NULL || exp->exp_obd->obd_namespace != NULL)
3267                 ldlm_cancel_locks_for_export(exp);
3268
3269         /* complete all outstanding replies */
3270         spin_lock(&exp->exp_lock);
3271         while (!list_empty(&exp->exp_outstanding_replies)) {
3272                 struct ptlrpc_reply_state *rs =
3273                         list_entry(exp->exp_outstanding_replies.next,
3274                                    struct ptlrpc_reply_state, rs_exp_list);
3275                 struct ptlrpc_service *svc = rs->rs_service;
3276
3277                 spin_lock(&svc->srv_lock);
3278                 list_del_init(&rs->rs_exp_list);
3279                 ptlrpc_schedule_difficult_reply(rs);
3280                 spin_unlock(&svc->srv_lock);
3281         }
3282         spin_unlock(&exp->exp_lock);
3283
3284         class_export_put(exp);
3285         RETURN(rc);
3286 }
3287
3288 /* FIXME: Can we avoid using these two interfaces? */
3289 static int mdt_init_export(struct obd_export *exp)
3290 {
3291         struct mdt_export_data *med = &exp->exp_mdt_data;
3292         ENTRY;
3293
3294         INIT_LIST_HEAD(&med->med_open_head);
3295         spin_lock_init(&med->med_open_lock);
3296         exp->exp_connecting = 1;
3297         RETURN(0);
3298 }
3299
3300 static int mdt_destroy_export(struct obd_export *export)
3301 {
3302         struct mdt_export_data *med;
3303         struct obd_device      *obd = export->exp_obd;
3304         struct mdt_device      *mdt;
3305         struct mdt_thread_info *info;
3306         struct lu_context       ctxt;
3307         struct md_attr         *ma;
3308         int rc = 0;
3309         ENTRY;
3310
3311         med = &export->exp_mdt_data;
3312
3313         target_destroy_export(export);
3314
3315         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
3316                 RETURN(0);
3317
3318         mdt = mdt_dev(obd->obd_lu_dev);
3319         LASSERT(mdt != NULL);
3320
3321         rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3322         if (rc)
3323                 RETURN(rc);
3324
3325         lu_context_enter(&ctxt);
3326
3327         info = lu_context_key_get(&ctxt, &mdt_thread_key);
3328         LASSERT(info != NULL);
3329         memset(info, 0, sizeof *info);
3330         info->mti_ctxt = &ctxt;
3331         info->mti_mdt = mdt;
3332
3333         ma = &info->mti_attr;
3334         ma->ma_lmm_size = mdt->mdt_max_mdsize;
3335         ma->ma_cookie_size = mdt->mdt_max_cookiesize;
3336         OBD_ALLOC(ma->ma_lmm, mdt->mdt_max_mdsize);
3337         OBD_ALLOC(ma->ma_cookie, mdt->mdt_max_cookiesize);
3338
3339         if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
3340                 GOTO(out, rc = -ENOMEM);
3341         ma->ma_need = MA_LOV | MA_COOKIE;
3342
3343         /* Close any open files (which may also cause orphan unlinking). */
3344         spin_lock(&med->med_open_lock);
3345         while (!list_empty(&med->med_open_head)) {
3346                 struct list_head *tmp = med->med_open_head.next;
3347                 struct mdt_file_data *mfd =
3348                         list_entry(tmp, struct mdt_file_data, mfd_list);
3349                 struct md_attr *ma = &info->mti_attr;
3350
3351                 /* Remove mfd handle so it can't be found again.
3352                  * We are consuming the mfd_list reference here. */
3353                 class_handle_unhash(&mfd->mfd_handle);
3354                 list_del_init(&mfd->mfd_list);
3355                 spin_unlock(&med->med_open_lock);
3356                 mdt_mfd_close(info, mfd);
3357                 /* TODO: if we close the unlinked file,
3358                  * we need to remove it's objects from OST */
3359                 memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
3360                 spin_lock(&med->med_open_lock);
3361         }
3362         spin_unlock(&med->med_open_lock);
3363         info->mti_mdt = NULL;
3364         mdt_client_del(&ctxt, mdt, med);
3365
3366 out:
3367         if (ma->ma_lmm)
3368                 OBD_FREE(ma->ma_lmm, mdt->mdt_max_mdsize);
3369         if (ma->ma_cookie)
3370                 OBD_FREE(ma->ma_cookie, mdt->mdt_max_cookiesize);
3371         lu_context_exit(&ctxt);
3372         lu_context_fini(&ctxt);
3373
3374         RETURN(rc);
3375 }
3376
3377 static int mdt_upcall(const struct lu_context *ctx, struct md_device *md,
3378                       enum md_upcall_event ev)
3379 {
3380         struct mdt_device *m = mdt_dev(&md->md_lu_dev);
3381         struct md_device  *next  = m->mdt_child;
3382         struct mdt_thread_info *mti;
3383         int rc = 0;
3384         ENTRY;
3385
3386         switch (ev) {
3387                 case MD_LOV_SYNC:
3388                         rc = next->md_ops->mdo_maxsize_get(ctx, next,
3389                                         &m->mdt_max_mdsize,
3390                                         &m->mdt_max_cookiesize);
3391                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
3392                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
3393                         break;
3394                 case MD_NO_TRANS:
3395                         mti = lu_context_key_get(ctx, &mdt_thread_key);
3396                         mti->mti_no_need_trans = 1;
3397                         CDEBUG(D_INFO, "disable mdt trans for this thread\n");
3398                         break;
3399                 default:
3400                         CERROR("invalid event\n");
3401                         rc = -EINVAL;
3402                         break;
3403         }
3404         RETURN(rc);
3405 }
3406
3407 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3408                          void *karg, void *uarg)
3409 {
3410         struct lu_context  ctxt;
3411         struct obd_device *obd= exp->exp_obd;
3412         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
3413         struct dt_device  *dt = mdt->mdt_bottom;
3414         int rc;
3415
3416         ENTRY;
3417         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
3418         rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3419         if (rc)
3420                 RETURN(rc);
3421         lu_context_enter(&ctxt);
3422
3423         switch (cmd) {
3424         case OBD_IOC_SYNC:
3425                 rc = dt->dd_ops->dt_sync(&ctxt, dt);
3426                 break;
3427
3428         case OBD_IOC_SET_READONLY: 
3429                 rc = dt->dd_ops->dt_sync(&ctxt, dt);
3430                 dt->dd_ops->dt_ro(&ctxt, dt);
3431                 break;
3432
3433         case OBD_IOC_ABORT_RECOVERY:
3434                 CERROR("aborting recovery for device %s\n", obd->obd_name);
3435                 target_abort_recovery(obd);
3436                 break;
3437
3438         default:
3439                 CERROR("not supported cmd = %d for device %s\n",
3440                        cmd, obd->obd_name);
3441                 rc = -EOPNOTSUPP;
3442         }
3443
3444         lu_context_exit(&ctxt);
3445         lu_context_fini(&ctxt);
3446         RETURN(rc);
3447 }
3448
3449 int mdt_postrecov(const struct lu_context *ctx, struct mdt_device *mdt)
3450 {
3451         struct lu_device *ld = md2lu_dev(mdt->mdt_child);
3452         int rc;
3453         ENTRY;
3454         rc = ld->ld_ops->ldo_recovery_complete(ctx, ld);
3455         RETURN(rc);
3456 }
3457
3458 int mdt_obd_postrecov(struct obd_device *obd)
3459 {
3460         struct lu_context ctxt;
3461         int rc;
3462
3463         rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3464         if (rc)
3465                 RETURN(rc);
3466         lu_context_enter(&ctxt);
3467         rc = mdt_postrecov(&ctxt, mdt_dev(obd->obd_lu_dev));
3468         lu_context_exit(&ctxt);
3469         lu_context_fini(&ctxt);
3470         return rc;
3471 }
3472
3473 static struct obd_ops mdt_obd_device_ops = {
3474         .o_owner          = THIS_MODULE,
3475         .o_connect        = mdt_obd_connect,
3476         .o_reconnect      = mdt_obd_reconnect,
3477         .o_disconnect     = mdt_obd_disconnect,
3478         .o_init_export    = mdt_init_export,
3479         .o_destroy_export = mdt_destroy_export,
3480         .o_iocontrol      = mdt_iocontrol,
3481         .o_postrecov      = mdt_obd_postrecov
3482
3483 };
3484
3485 static struct lu_device* mdt_device_fini(const struct lu_context *ctx,
3486                                          struct lu_device *d)
3487 {
3488         struct mdt_device *m = mdt_dev(d);
3489
3490         mdt_fini(ctx, m);
3491         RETURN(NULL);
3492 }
3493
3494 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
3495 {
3496         struct mdt_device *m = mdt_dev(d);
3497
3498         OBD_FREE_PTR(m);
3499 }
3500
3501 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
3502                                           struct lu_device_type *t,
3503                                           struct lustre_cfg *cfg)
3504 {
3505         struct lu_device  *l;
3506         struct mdt_device *m;
3507
3508         OBD_ALLOC_PTR(m);
3509         if (m != NULL) {
3510                 int rc;
3511
3512                 l = &m->mdt_md_dev.md_lu_dev;
3513                 rc = mdt_init0(ctx, m, t, cfg);
3514                 if (rc != 0) {
3515                         OBD_FREE_PTR(m);
3516                         l = ERR_PTR(rc);
3517                         return l;
3518                 }
3519                 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
3520         } else
3521                 l = ERR_PTR(-ENOMEM);
3522         return l;
3523 }
3524
3525 /*
3526  * context key constructor/destructor
3527  */
3528 static void *mdt_thread_init(const struct lu_context *ctx,
3529                              struct lu_context_key *key)
3530 {
3531         struct mdt_thread_info *info;
3532
3533         /*
3534          * check that no high order allocations are incurred.
3535          */
3536         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
3537         OBD_ALLOC_PTR(info);
3538         if (info == NULL)
3539                 info = ERR_PTR(-ENOMEM);
3540         return info;
3541 }
3542
3543 static void mdt_thread_fini(const struct lu_context *ctx,
3544                             struct lu_context_key *key, void *data)
3545 {
3546         struct mdt_thread_info *info = data;
3547         OBD_FREE_PTR(info);
3548 }
3549
3550 struct lu_context_key mdt_thread_key = {
3551         .lct_tags = LCT_MD_THREAD,
3552         .lct_init = mdt_thread_init,
3553         .lct_fini = mdt_thread_fini
3554 };
3555
3556 static void *mdt_txn_init(const struct lu_context *ctx,
3557                              struct lu_context_key *key)
3558 {
3559         struct mdt_txn_info *txi;
3560
3561         /*
3562          * check that no high order allocations are incurred.
3563          */
3564         CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
3565         OBD_ALLOC_PTR(txi);
3566         if (txi == NULL)
3567                 txi = ERR_PTR(-ENOMEM);
3568         return txi;
3569 }
3570
3571 static void mdt_txn_fini(const struct lu_context *ctx,
3572                             struct lu_context_key *key, void *data)
3573 {
3574         struct mdt_txn_info *txi = data;
3575         OBD_FREE_PTR(txi);
3576 }
3577
3578 struct lu_context_key mdt_txn_key = {
3579         .lct_tags = LCT_TX_HANDLE,
3580         .lct_init = mdt_txn_init,
3581         .lct_fini = mdt_txn_fini
3582 };
3583
3584
3585 static int mdt_type_init(struct lu_device_type *t)
3586 {
3587         int rc;
3588
3589         rc = lu_context_key_register(&mdt_thread_key);
3590         if (rc == 0)
3591                 rc = lu_context_key_register(&mdt_txn_key);
3592         return rc;
3593 }
3594
3595 static void mdt_type_fini(struct lu_device_type *t)
3596 {
3597         lu_context_key_degister(&mdt_thread_key);
3598         lu_context_key_degister(&mdt_txn_key);
3599 }
3600
3601 static struct lu_device_type_operations mdt_device_type_ops = {
3602         .ldto_init = mdt_type_init,
3603         .ldto_fini = mdt_type_fini,
3604
3605         .ldto_device_alloc = mdt_device_alloc,
3606         .ldto_device_free  = mdt_device_free,
3607         .ldto_device_fini  = mdt_device_fini
3608 };
3609
3610 static struct lu_device_type mdt_device_type = {
3611         .ldt_tags     = LU_DEVICE_MD,
3612         .ldt_name     = LUSTRE_MDT_NAME,
3613         .ldt_ops      = &mdt_device_type_ops,
3614         .ldt_ctx_tags = LCT_MD_THREAD
3615 };
3616
3617 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
3618         { "uuid",            lprocfs_rd_uuid,                0, 0 },
3619         { "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 },
3620         { "num_exports",     lprocfs_rd_num_exports,         0, 0 },
3621         { 0 }
3622 };
3623
3624 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
3625         { "num_refs",        lprocfs_rd_numrefs,             0, 0 },
3626         { 0 }
3627 };
3628
3629 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
3630
3631 static int __init mdt_mod_init(void)
3632 {
3633         struct lprocfs_static_vars lvars;
3634         int rc;
3635
3636         printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n");
3637         
3638         mdt_num_threads = MDT_NUM_THREADS;
3639         lprocfs_init_vars(mdt, &lvars);
3640         rc = class_register_type(&mdt_obd_device_ops, NULL,
3641                                  lvars.module_vars, LUSTRE_MDT_NAME,
3642                                  &mdt_device_type);
3643         return rc;
3644 }
3645
3646 static void __exit mdt_mod_exit(void)
3647 {
3648         class_unregister_type(LUSTRE_MDT_NAME);
3649 }
3650
3651
3652 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
3653 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
3654         .mh_name    = #opc,                                             \
3655         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
3656         .mh_opc     = prefix ## _  ## opc,                              \
3657         .mh_flags   = flags,                                            \
3658         .mh_act     = fn,                                               \
3659         .mh_fmt     = fmt                                               \
3660 }
3661
3662 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
3663         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
3664
3665 #define DEF_SEQ_HNDL(flags, name, fn, fmt)                      \
3666         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
3667
3668 #define DEF_FLD_HNDL(flags, name, fn, fmt)                      \
3669         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
3670 /*
3671  * Request with a format known in advance
3672  */
3673 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
3674         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
3675
3676 #define DEF_SEQ_HNDL_F(flags, name, fn)                                 \
3677         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
3678
3679 #define DEF_FLD_HNDL_F(flags, name, fn)                                 \
3680         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_FLD_ ## name)
3681 /*
3682  * Request with a format we do not yet know
3683  */
3684 #define DEF_MDT_HNDL_0(flags, name, fn)                                 \
3685         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
3686
3687 static struct mdt_handler mdt_mds_ops[] = {
3688 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
3689 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
3690 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
3691 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR,      mdt_getattr),
3692 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
3693 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO|MUTABOR,
3694                                           SETXATTR,     mdt_setxattr),
3695 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
3696 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
3697 DEF_MDT_HNDL_F(0                        |MUTABOR,
3698                                           REINT,        mdt_reint),
3699 DEF_MDT_HNDL_F(HABEO_CORPUS             , CLOSE,        mdt_close),
3700 DEF_MDT_HNDL_F(HABEO_CORPUS             , DONE_WRITING, mdt_done_writing),
3701 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
3702 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
3703 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR,    mdt_is_subdir),
3704 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_quotacheck_handle),
3705 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle)
3706 };
3707
3708 #define DEF_OBD_HNDL(flags, name, fn)                   \
3709         DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
3710
3711
3712 static struct mdt_handler mdt_obd_ops[] = {
3713         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
3714         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
3715         DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
3716 };
3717
3718 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
3719         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
3720 #define DEF_DLM_HNDL_F(flags, name, fn)                   \
3721         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
3722
3723 static struct mdt_handler mdt_dlm_ops[] = {
3724         DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
3725         DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
3726         DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
3727         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
3728 };
3729
3730 static struct mdt_handler mdt_llog_ops[] = {
3731 };
3732
3733 #define DEF_SEC_CTX_HNDL(name, fn)                      \
3734         DEF_HNDL(SEC_CTX, INIT, _NET, 0, name, fn, NULL)
3735
3736 static struct mdt_handler mdt_sec_ctx_ops[] = {
3737         DEF_SEC_CTX_HNDL(INIT,          mdt_sec_ctx_handle),
3738         DEF_SEC_CTX_HNDL(INIT_CONT,     mdt_sec_ctx_handle),
3739         DEF_SEC_CTX_HNDL(FINI,          mdt_sec_ctx_handle)
3740 };
3741
3742 static struct mdt_opc_slice mdt_regular_handlers[] = {
3743         {
3744                 .mos_opc_start = MDS_GETATTR,
3745                 .mos_opc_end   = MDS_LAST_OPC,
3746                 .mos_hs        = mdt_mds_ops
3747         },
3748         {
3749                 .mos_opc_start = OBD_PING,
3750                 .mos_opc_end   = OBD_LAST_OPC,
3751                 .mos_hs        = mdt_obd_ops
3752         },
3753         {
3754                 .mos_opc_start = LDLM_ENQUEUE,
3755                 .mos_opc_end   = LDLM_LAST_OPC,
3756                 .mos_hs        = mdt_dlm_ops
3757         },
3758         {
3759                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
3760                 .mos_opc_end   = LLOG_LAST_OPC,
3761                 .mos_hs        = mdt_llog_ops
3762         },
3763         {
3764                 .mos_opc_start = SEC_CTX_INIT,
3765                 .mos_opc_end   = SEC_LAST_OPC,
3766                 .mos_hs        = mdt_sec_ctx_ops
3767         },
3768         {
3769                 .mos_hs        = NULL
3770         }
3771 };
3772
3773 static struct mdt_handler mdt_readpage_ops[] = {
3774         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
3775 #ifdef HAVE_SPLIT_SUPPORT
3776         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
3777 #endif
3778
3779         /*
3780          * XXX: this is ugly and should be fixed one day, see mdc_close() for
3781          * detailed comments. --umka
3782          */
3783         DEF_MDT_HNDL_F(HABEO_CORPUS,              CLOSE,    mdt_close),
3784         DEF_MDT_HNDL_F(HABEO_CORPUS,              DONE_WRITING,    mdt_done_writing),
3785 };
3786
3787 static struct mdt_opc_slice mdt_readpage_handlers[] = {
3788         {
3789                 .mos_opc_start = MDS_GETATTR,
3790                 .mos_opc_end   = MDS_LAST_OPC,
3791                 .mos_hs        = mdt_readpage_ops
3792         },
3793         {
3794                 .mos_hs        = NULL
3795         }
3796 };
3797
3798 static struct mdt_handler mdt_seq_ops[] = {
3799         DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
3800 };
3801
3802 static struct mdt_opc_slice mdt_seq_handlers[] = {
3803         {
3804                 .mos_opc_start = SEQ_QUERY,
3805                 .mos_opc_end   = SEQ_LAST_OPC,
3806                 .mos_hs        = mdt_seq_ops
3807         },
3808         {
3809                 .mos_hs        = NULL
3810         }
3811 };
3812
3813 static struct mdt_handler mdt_fld_ops[] = {
3814         DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
3815 };
3816
3817 static struct mdt_opc_slice mdt_fld_handlers[] = {
3818         {
3819                 .mos_opc_start = FLD_QUERY,
3820                 .mos_opc_end   = FLD_LAST_OPC,
3821                 .mos_hs        = mdt_fld_ops
3822         },
3823         {
3824                 .mos_hs        = NULL
3825         }
3826 };
3827
3828 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3829 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
3830 MODULE_LICENSE("GPL");
3831
3832 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
3833                 "number of mdt service threads to start");
3834
3835 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);