Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mdt/mdt_handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *   Author: Huang Hua <huanghua@clusterfs.com>
14  *
15  *   This file is part of the Lustre file system, http://www.lustre.org
16  *   Lustre is a trademark of Cluster File Systems, Inc.
17  *
18  *   You may have signed or agreed to another license before downloading
19  *   this software.  If so, you are bound by the terms and conditions
20  *   of that agreement, and the following does not apply to you.  See the
21  *   LICENSE file included with this distribution for more information.
22  *
23  *   If you did not agree to a different license, then this copy of Lustre
24  *   is open source software; you can redistribute it and/or modify it
25  *   under the terms of version 2 of the GNU General Public License as
26  *   published by the Free Software Foundation.
27  *
28  *   In either case, Lustre is distributed in the hope that it will be
29  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31  *   license text for more details.
32  */
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37 #define DEBUG_SUBSYSTEM S_MDS
38
39 #include <linux/module.h>
40
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
43 /*
44  * struct OBD_{ALLOC,FREE}*()
45  * MDT_FAIL_CHECK
46  */
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
53 #include <obd.h>
54 /* lu2dt_dev() */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include "mdt_internal.h"
58 #include <linux/lustre_acl.h>
59 /*
60  * Initialized in mdt_mod_init().
61  */
62 unsigned long mdt_num_threads;
63
64 /* ptlrpc request handler for MDT. All handlers are
65  * grouped into several slices - struct mdt_opc_slice,
66  * and stored in an array - mdt_handlers[].
67  */
68 struct mdt_handler {
69         /* The name of this handler. */
70         const char *mh_name;
71         /* Fail id for this handler, checked at the beginning of this handler*/
72         int         mh_fail_id;
73         /* Operation code for this handler */
74         __u32       mh_opc;
75         /* flags are listed in enum mdt_handler_flags below. */
76         __u32       mh_flags;
77         /* The actual handler function to execute. */
78         int (*mh_act)(struct mdt_thread_info *info);
79         /* Request format for this request. */
80         const struct req_format *mh_fmt;
81 };
82
83 enum mdt_handler_flags {
84         /*
85          * struct mdt_body is passed in the incoming message, and object
86          * identified by this fid exists on disk.
87          *
88          * "habeo corpus" == "I have a body"
89          */
90         HABEO_CORPUS = (1 << 0),
91         /*
92          * struct ldlm_request is passed in the incoming message.
93          *
94          * "habeo clavis" == "I have a key"
95          */
96         HABEO_CLAVIS = (1 << 1),
97         /*
98          * this request has fixed reply format, so that reply message can be
99          * packed by generic code.
100          *
101          * "habeo refero" == "I have a reply"
102          */
103         HABEO_REFERO = (1 << 2),
104         /*
105          * this request will modify something, so check whether the filesystem
106          * is readonly or not, then return -EROFS to client asap if necessary.
107          *
108          * "mutabor" == "I shall modify"
109          */
110         MUTABOR      = (1 << 3)
111 };
112
113 struct mdt_opc_slice {
114         __u32               mos_opc_start;
115         int                 mos_opc_end;
116         struct mdt_handler *mos_hs;
117 };
118
119 static struct mdt_opc_slice mdt_regular_handlers[];
120 static struct mdt_opc_slice mdt_readpage_handlers[];
121
122 static struct mdt_device *mdt_dev(struct lu_device *d);
123 static int mdt_regular_handle(struct ptlrpc_request *req);
124 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
125
126 static struct lu_object_operations mdt_obj_ops;
127
128 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
129 {
130         if (!rep)
131                 return 0;
132         return (rep->lock_policy_res1 & flag);
133 }
134
135 void mdt_set_disposition(struct mdt_thread_info *info,
136                                 struct ldlm_reply *rep, int flag)
137 {
138         if (info)
139                 info->mti_opdata |= flag;
140         if (rep)
141                 rep->lock_policy_res1 |= flag;
142 }
143
144
145 static int mdt_getstatus(struct mdt_thread_info *info)
146 {
147         struct md_device *next  = info->mti_mdt->mdt_child;
148         int               result;
149         struct mdt_body  *body;
150
151         ENTRY;
152
153         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
154                 result = -ENOMEM;
155         else {
156                 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
157                 result = next->md_ops->mdo_root_get(info->mti_ctxt,
158                                                     next, &body->fid1);
159                 if (result == 0)
160                         body->valid |= OBD_MD_FLID;
161         }
162
163         RETURN(result);
164 }
165
166 static int mdt_statfs(struct mdt_thread_info *info)
167 {
168         struct md_device  *next  = info->mti_mdt->mdt_child;
169         struct obd_statfs *osfs;
170         int                result;
171
172         ENTRY;
173
174         /* This will trigger a watchdog timeout */
175         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
176                          (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
177         
178
179         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
180                 result = -ENOMEM;
181         } else {
182                 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
183                 /* XXX max_age optimisation is needed here. See mds_statfs */
184                 result = next->md_ops->mdo_statfs(info->mti_ctxt,
185                                                   next, &info->mti_u.ksfs);
186                 statfs_pack(osfs, &info->mti_u.ksfs);
187         }
188
189         RETURN(result);
190 }
191
192 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
193                         const struct lu_fid *fid)
194 {
195         /*XXX should pack the reply body according to lu_valid*/
196         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID   |
197                     OBD_MD_FLGID   | OBD_MD_FLTYPE  |
198                     OBD_MD_FLMODE  | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
199                     OBD_MD_FLATIME | OBD_MD_FLMTIME ;
200
201         if (!S_ISREG(attr->la_mode))
202                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
203
204         b->atime      = attr->la_atime;
205         b->mtime      = attr->la_mtime;
206         b->ctime      = attr->la_ctime;
207         b->mode       = attr->la_mode;
208         b->size       = attr->la_size;
209         b->blocks     = attr->la_blocks;
210         b->uid        = attr->la_uid;
211         b->gid        = attr->la_gid;
212         b->flags      = attr->la_flags;
213         b->nlink      = attr->la_nlink;
214         b->rdev       = attr->la_rdev;
215
216         if (fid) {
217                 b->fid1 = *fid;
218                 b->valid |= OBD_MD_FLID;
219                 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
220                                 PFID(fid), b->nlink, b->mode, b->size);
221         }
222 }
223
224 static inline int mdt_body_has_lov(const struct lu_attr *la,
225                                    const struct mdt_body *body)
226 {
227         return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
228                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
229 }
230
231 static int mdt_getattr_internal(struct mdt_thread_info *info,
232                                 struct mdt_object *o)
233 {
234         struct md_object        *next = mdt_object_child(o);
235         const struct mdt_body   *reqbody = info->mti_body;
236         struct ptlrpc_request   *req = mdt_info_req(info);
237         struct md_attr          *ma = &info->mti_attr;
238         struct lu_attr          *la = &ma->ma_attr;
239         struct req_capsule      *pill = &info->mti_pill;
240         const struct lu_context *ctxt = info->mti_ctxt;
241         struct mdt_body         *repbody;
242         void                    *buffer;
243         int                     length;
244         int                     rc;
245         ENTRY;
246
247         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
248                 RETURN(-ENOMEM);
249         
250         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
251         repbody->eadatasize = 0;
252         repbody->aclsize = 0;
253
254         ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
255         ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
256
257         ma->ma_need = MA_INODE | MA_LOV;
258         rc = mo_attr_get(ctxt, next, ma);
259         if (rc == -EREMOTE) {
260                 /* This object is located on remote node.*/
261                 repbody->fid1 = *mdt_object_fid(o);
262                 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
263                 RETURN(0);
264         } else if (rc){
265                 CERROR("getattr error for "DFID": %d\n",
266                         PFID(mdt_object_fid(o)), rc);
267                 RETURN(rc);
268         }
269
270         if (ma->ma_valid & MA_INODE)
271                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
272         else
273                 RETURN(-EFAULT);
274
275         if (mdt_body_has_lov(la, reqbody)) {
276                 if (ma->ma_valid & MA_LOV) {
277                         LASSERT(ma->ma_lmm_size);
278                         mdt_dump_lmm(D_INFO, ma->ma_lmm);
279                         repbody->eadatasize = ma->ma_lmm_size;
280                         if (S_ISDIR(la->la_mode))
281                                 repbody->valid |= OBD_MD_FLDIREA;
282                         else
283                                 repbody->valid |= OBD_MD_FLEASIZE;
284                 }
285         } else if (S_ISLNK(la->la_mode) &&
286                           reqbody->valid & OBD_MD_LINKNAME) {
287                 rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
288                 if (rc <= 0) {
289                         CERROR("readlink failed: %d\n", rc);
290                         rc = -EFAULT;
291                 } else {
292                         repbody->valid |= OBD_MD_LINKNAME;
293                         repbody->eadatasize = rc + 1;
294                         ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
295                         CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
296                                         (char*)ma->ma_lmm, rc);
297                         rc = 0;
298                 }
299         }
300
301         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
302                 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
303                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
304                 repbody->valid |= OBD_MD_FLMODEASIZE;
305                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
306                                 "MAX_COOKIE to : %d:%d\n",
307                                 repbody->max_mdsize,
308                                 repbody->max_cookiesize);
309         }
310
311 #ifdef CONFIG_FS_POSIX_ACL
312         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
313             (reqbody->valid & OBD_MD_FLACL)) {
314                 buffer = req_capsule_server_get(pill, &RMF_ACL);
315                 length = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
316                 if (length > 0) {
317                         rc = mo_xattr_get(ctxt, next, buffer,
318                                           length, XATTR_NAME_ACL_ACCESS);
319                         if (rc < 0) {
320                                 if (rc == -ENODATA || rc == -EOPNOTSUPP)
321                                         rc = 0;
322                                 else
323                                         CERROR("got acl size: %d\n", rc);
324                         } else {
325                                 repbody->aclsize = rc;
326                                 repbody->valid |= OBD_MD_FLACL;
327                         }
328                 }
329         }
330 #endif
331
332         RETURN(rc);
333 }
334
335 static int mdt_getattr(struct mdt_thread_info *info)
336 {
337         int result;
338         struct mdt_object *obj;
339
340         obj = info->mti_object;
341         LASSERT(obj != NULL);
342         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
343         ENTRY;
344
345         result = mdt_getattr_internal(info, obj);
346         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
347         RETURN(result);
348 }
349
350 /*
351  * UPDATE lock should be taken against parent, and be release before exit;
352  * child_bits lock should be taken against child, and be returned back:
353  *            (1)normal request should release the child lock;
354  *            (2)intent request will grant the lock to client.
355  */
356 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
357                                  struct mdt_lock_handle *lhc,
358                                  __u64 child_bits,
359                                  struct ldlm_reply *ldlm_rep)
360 {
361         struct mdt_object *parent = info->mti_object;
362         struct mdt_object *child;
363         struct md_object  *next = mdt_object_child(info->mti_object);
364         struct lu_fid     *child_fid = &info->mti_tmp_fid1;
365         const char        *name;
366         int               result;
367         struct mdt_lock_handle *lhp;
368         ENTRY;
369
370         LASSERT(info->mti_object != NULL);
371         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
372         if (name == NULL)
373                 RETURN(-EFAULT);
374
375         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
376                         PFID(mdt_object_fid(parent)), name, ldlm_rep);
377
378         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
379         if (strlen(name) == 0) {
380                 /* only getattr on the child. parent is on another node. */
381                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
382                 child = parent;
383                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
384                                ", ldlm_rep=%p\n",
385                                PFID(mdt_object_fid(child)), ldlm_rep);
386
387                 mdt_lock_handle_init(lhc);
388                 lhc->mlh_mode = LCK_CR;
389                 result = mdt_object_lock(info, child, lhc, child_bits);
390                 if (result == 0) {
391                         /* finally, we can get attr for child. */
392                         result = mdt_getattr_internal(info, child);
393                         if (result != 0)
394                                 mdt_object_unlock(info, child, lhc, 1);
395                 }
396                 GOTO(out, result);
397         }
398
399         /*step 1: lock parent */
400         lhp = &info->mti_lh[MDT_LH_PARENT];
401         lhp->mlh_mode = LCK_CR;
402         result = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
403         if (result != 0)
404                 RETURN(result);
405
406         /*step 2: lookup child's fid by name */
407         result = mdo_lookup(info->mti_ctxt, next, name, child_fid);
408         if (result != 0) {
409                 if (result == -ENOENT)
410                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
411                 GOTO(out_parent, result);
412         } else
413                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
414         /*
415          *step 3: find the child object by fid & lock it.
416          *        regardless if it is local or remote.
417          */
418         mdt_lock_handle_init(lhc);
419         lhc->mlh_mode = LCK_CR;
420         child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
421         if (IS_ERR(child))
422                 GOTO(out_parent, result = PTR_ERR(child));
423
424         /* finally, we can get attr for child. */
425         result = mdt_getattr_internal(info, child);
426         if (result != 0)
427                 mdt_object_unlock(info, child, lhc, 1);
428         else {
429                 /* This is pure debugging code. */
430                 struct ldlm_lock *lock;
431                 struct ldlm_res_id *res_id;
432                 lock = ldlm_handle2lock(&lhc->mlh_lh);
433                 if (lock) {
434                         res_id = &lock->l_resource->lr_name;
435                         LDLM_DEBUG(lock, "we will return this lock client\n");
436                         LASSERTF(fid_res_name_eq(mdt_object_fid(child),
437                                                  &lock->l_resource->lr_name),
438                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
439                                 (unsigned long)res_id->name[0],
440                                 (unsigned long)res_id->name[1],
441                                 (unsigned long)res_id->name[2],
442                                 PFID(mdt_object_fid(child)));
443                         LDLM_LOCK_PUT(lock);
444                 }
445         }
446         mdt_object_put(info->mti_ctxt, child);
447
448         EXIT;
449 out_parent:
450         mdt_object_unlock(info, parent, lhp, 1);
451 out:
452         return result;
453 }
454
455 /* normal handler: should release the child lock */
456 static int mdt_getattr_name(struct mdt_thread_info *info)
457 {
458         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
459         int rc;
460
461         ENTRY;
462
463         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
464         if (lustre_handle_is_used(&lhc->mlh_lh)) {
465                 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
466                 lhc->mlh_lh.cookie = 0;
467         }
468         mdt_shrink_reply(info, REPLY_REC_OFF + 1);
469         RETURN(rc);
470 }
471
472 static struct lu_device_operations mdt_lu_ops;
473
474 static int lu_device_is_mdt(struct lu_device *d)
475 {
476         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
477 }
478
479 static struct mdt_device *mdt_dev(struct lu_device *d)
480 {
481         LASSERT(lu_device_is_mdt(d));
482         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
483 }
484
485 static int mdt_connect(struct mdt_thread_info *info)
486 {
487         int result;
488         struct ptlrpc_request *req;
489
490         req = mdt_info_req(info);
491         result = target_handle_connect(req, mdt_regular_handle);
492         if (result == 0) {
493                 LASSERT(req->rq_export != NULL);
494                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
495         }
496         return result;
497 }
498
499 static int mdt_disconnect(struct mdt_thread_info *info)
500 {
501         return target_handle_disconnect(mdt_info_req(info));
502 }
503
504 static int mdt_sendpage(struct mdt_thread_info *info,
505                         struct lu_rdpg *rdpg)
506 {
507         struct ptlrpc_request   *req = mdt_info_req(info);
508         struct ptlrpc_bulk_desc *desc;
509         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
510         int                      tmpcount;
511         int                      tmpsize;
512         int                      i;
513         int                      rc;
514         ENTRY;
515
516         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
517                                     MDS_BULK_PORTAL);
518         if (desc == NULL)
519                 GOTO(out, rc = -ENOMEM);
520
521         for (i = 0, tmpcount = rdpg->rp_count;
522                 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
523                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
524                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
525         }
526
527         LASSERT(desc->bd_nob == rdpg->rp_count);
528         rc = ptlrpc_start_bulk_transfer(desc);
529         if (rc)
530                 GOTO(free_desc, rc);
531
532         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
533                 GOTO(abort_bulk, rc);
534
535         *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
536         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
537         LASSERT (rc == 0 || rc == -ETIMEDOUT);
538
539         if (rc == 0) {
540                 if (desc->bd_success &&
541                     desc->bd_nob_transferred == rdpg->rp_count)
542                         GOTO(free_desc, rc);
543
544                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
545         }
546
547         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
548                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
549                   desc->bd_nob_transferred, rdpg->rp_count,
550                   req->rq_export->exp_client_uuid.uuid,
551                   req->rq_export->exp_connection->c_remote_uuid.uuid);
552
553         class_fail_export(req->rq_export);
554
555         EXIT;
556 abort_bulk:
557         ptlrpc_abort_bulk(desc);
558 free_desc:
559         ptlrpc_free_bulk(desc);
560 out:
561         return rc;
562 }
563
564 static int mdt_readpage(struct mdt_thread_info *info)
565 {
566         struct mdt_object *object = info->mti_object;
567         struct lu_rdpg    *rdpg = &info->mti_u.rdpg.mti_rdpg;
568         struct mdt_body   *reqbody;
569         struct mdt_body   *repbody;
570         int                rc;
571         int                i;
572         ENTRY;
573
574         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
575                 RETURN(-ENOMEM);
576
577         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
578         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
579         if (reqbody == NULL || repbody == NULL)
580                 RETURN(-EFAULT);
581
582         /*
583          * prepare @rdpg before calling lower layers and transfer itself. Here
584          * reqbody->size contains offset of where to start to read and
585          * reqbody->nlink contains number bytes to read.
586          */
587         rdpg->rp_hash = reqbody->size;
588         rdpg->rp_hash_end = ~0ul;
589         if ((__u64)rdpg->rp_hash != reqbody->size) {
590                 CERROR("Invalid hash: %#llx != %#llx\n",
591                        (__u64)rdpg->rp_hash, reqbody->size);
592                 RETURN(-EFAULT);
593         }
594         rdpg->rp_count  = reqbody->nlink;
595         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
596         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
597         if (rdpg->rp_pages == NULL)
598                 RETURN(-ENOMEM);
599
600         for (i = 0; i < rdpg->rp_npages; ++i) {
601                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
602                 if (rdpg->rp_pages[i] == NULL)
603                         GOTO(free_rdpg, rc = -ENOMEM);
604         }
605
606         /* call lower layers to fill allocated pages with directory data */
607         rc = mo_readpage(info->mti_ctxt, mdt_object_child(object), rdpg);
608         if (rc)
609                 GOTO(free_rdpg, rc);
610
611         /* send pages to client */
612         rc = mdt_sendpage(info, rdpg);
613
614         EXIT;
615 free_rdpg:
616         for (i = 0; i < rdpg->rp_npages; i++)
617                 if (rdpg->rp_pages[i] != NULL)
618                         __free_pages(rdpg->rp_pages[i], 0);
619         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
620
621         MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
622
623         return rc;
624 }
625
626 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
627 {
628         struct req_capsule      *pill = &info->mti_pill;
629         struct mdt_device       *mdt = info->mti_mdt;
630         struct ptlrpc_request   *req = mdt_info_req(info);
631         int                      rc;
632         ENTRY;
633
634         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
635                 RETURN(-EFAULT);
636
637         rc = mdt_reint_unpack(info, op);
638         if (rc != 0) 
639                 RETURN(rc);
640                 
641         /*pack reply*/
642         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
643                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
644                                      mdt->mdt_max_mdsize);
645         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
646                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
647                                      mdt->mdt_max_cookiesize);
648         rc = req_capsule_pack(pill);
649         if (rc != 0)
650                 RETURN(rc);
651
652         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
653                 struct mdt_client_data *mcd;
654                         
655                 mcd = req->rq_export->exp_mdt_data.med_mcd;
656                 if (mcd->mcd_last_xid == req->rq_xid) {
657                         mdt_reconstruct(info);
658                         RETURN(lustre_msg_get_status(req->rq_repmsg));
659                 } 
660                 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
661                                      mcd->mcd_last_xid);
662         }
663         rc = mdt_reint_rec(info);
664
665         RETURN(rc);
666 }
667
668 static long mdt_reint_opcode(struct mdt_thread_info *info,
669                              const struct req_format **fmt)
670 {
671         __u32 *ptr;
672         long opc;
673
674         opc = -EFAULT;
675         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
676         if (ptr != NULL) {
677                 opc = *ptr;
678                 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
679                 if (opc < REINT_MAX && fmt[opc] != NULL)
680                         req_capsule_extend(&info->mti_pill, fmt[opc]);
681                 else
682                         CERROR("Unsupported opc: %ld\n", opc);
683         }
684         return opc;
685 }
686
687 static int mdt_reint(struct mdt_thread_info *info)
688 {
689         long opc;
690         int  rc;
691
692         static const struct req_format *reint_fmts[REINT_MAX] = {
693                 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
694                 [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
695                 [REINT_LINK]    = &RQF_MDS_REINT_LINK,
696                 [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
697                 [REINT_RENAME]  = &RQF_MDS_REINT_RENAME,
698                 [REINT_OPEN]    = &RQF_MDS_REINT_OPEN
699         };
700
701         ENTRY;
702
703         opc = mdt_reint_opcode(info, reint_fmts);
704         if (opc >= 0) {
705                 rc = mdt_reint_internal(info, opc);
706         } else
707                 rc = opc;
708         
709         info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
710         RETURN(rc);
711 }
712
713 /* TODO these two methods not available now. */
714
715 /* this should sync the whole device */
716 static int mdt_device_sync(struct mdt_thread_info *info)
717 {
718         return 0;
719 }
720
721 /* this should sync this object */
722 static int mdt_object_sync(struct mdt_thread_info *info)
723 {
724         return 0;
725 }
726
727 static int mdt_sync(struct mdt_thread_info *info)
728 {
729         struct req_capsule *pill = &info->mti_pill;
730         struct mdt_body *body;
731         int rc;
732         ENTRY;
733
734         /* The fid may be zero, so we req_capsule_set manually */
735         req_capsule_set(pill, &RQF_MDS_SYNC);
736
737         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
738         if (body == NULL)
739                 RETURN(-EINVAL);
740
741         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
742                 RETURN(-ENOMEM);
743
744         if (fid_seq(&body->fid1) == 0) {
745                 /* sync the whole device */
746                 rc = req_capsule_pack(pill);
747                 if (rc == 0)
748                         rc = mdt_device_sync(info);
749         } else {
750                 /* sync an object */
751                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
752                 if (rc == 0) {
753                         rc = mdt_object_sync(info);
754                         if (rc == 0) {
755                                 struct md_object    *next;
756                                 const struct lu_fid *fid;
757                                 struct lu_attr      *la;
758
759                                 next = mdt_object_child(info->mti_object);
760                                 fid = mdt_object_fid(info->mti_object);
761                                 info->mti_attr.ma_need = MA_INODE;
762                                 rc = mo_attr_get(info->mti_ctxt, next, 
763                                                  &info->mti_attr);
764                                 la = &info->mti_attr.ma_attr;
765                                 if (rc == 0) {
766                                         body = req_capsule_server_get(pill,
767                                                                 &RMF_MDT_BODY);
768                                         mdt_pack_attr2body(body, la, fid);
769                                 }
770                         }
771                 }
772         }
773         RETURN(rc);
774 }
775
776 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
777 {
778         return -EOPNOTSUPP;
779 }
780
781 static int mdt_quotactl_handle(struct mdt_thread_info *info)
782 {
783         return -EOPNOTSUPP;
784 }
785
786 /*
787  * OBD PING and other handlers.
788  */
789 static int mdt_obd_ping(struct mdt_thread_info *info)
790 {
791         int result;
792         ENTRY;
793         result = target_handle_ping(mdt_info_req(info));
794         RETURN(result);
795 }
796
797 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
798 {
799         return -EOPNOTSUPP;
800 }
801
802 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
803 {
804         return -EOPNOTSUPP;
805 }
806
807
808 /*
809  * DLM handlers.
810  */
811
812 static struct ldlm_callback_suite cbs = {
813         .lcs_completion = ldlm_server_completion_ast,
814         .lcs_blocking   = ldlm_server_blocking_ast,
815         .lcs_glimpse    = NULL
816 };
817
818 static int mdt_enqueue(struct mdt_thread_info *info)
819 {
820         int result;
821         struct ptlrpc_request *req;
822
823         /*
824          * info->mti_dlm_req already contains swapped and (if necessary)
825          * converted dlm request.
826          */
827         LASSERT(info->mti_dlm_req != NULL);
828
829         req = mdt_info_req(info);
830         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
831         result = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
832                                       req, info->mti_dlm_req, &cbs);
833         return result ? : req->rq_status;
834 }
835
836 static int mdt_convert(struct mdt_thread_info *info)
837 {
838         int result;
839         struct ptlrpc_request *req;
840
841         LASSERT(info->mti_dlm_req);
842         req = mdt_info_req(info);
843         result = ldlm_handle_convert0(req, info->mti_dlm_req);
844         return result ? : req->rq_status;
845 }
846
847 static int mdt_bl_callback(struct mdt_thread_info *info)
848 {
849         CERROR("bl callbacks should not happen on MDS\n");
850         LBUG();
851         return -EOPNOTSUPP;
852 }
853
854 static int mdt_cp_callback(struct mdt_thread_info *info)
855 {
856         CERROR("cp callbacks should not happen on MDS\n");
857         LBUG();
858         return -EOPNOTSUPP;
859 }
860
861 /*
862  * Build (DLM) resource name from fid.
863  */
864 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
865                                        struct ldlm_res_id *name)
866 {
867         memset(name, 0, sizeof *name);
868         name->name[0] = fid_seq(f);
869         name->name[1] = fid_oid(f);
870         name->name[2] = fid_ver(f);
871         return name;
872 }
873
874 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
875 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
876              struct lustre_handle *lh, ldlm_mode_t mode,
877              ldlm_policy_data_t *policy,
878              struct ldlm_res_id *res_id)
879 {
880         int flags = 0; /*XXX: LDLM_FL_LOCAL_ONLY?*/
881         int rc;
882
883         LASSERT(ns != NULL);
884         LASSERT(lh != NULL);
885         LASSERT(f != NULL);
886
887         rc = ldlm_cli_enqueue_local(ns, *fid_build_res_name(f, res_id),
888                                     LDLM_IBITS, policy, mode, &flags, 
889                                     ldlm_blocking_ast, ldlm_completion_ast, 
890                                     NULL, NULL, 0, NULL, lh);
891         return rc == ELDLM_OK ? 0 : -EIO;
892 }
893
894 /* just call ldlm_lock_decref() if decref,
895  * else we only call ptlrpc_save_lock() to save this lock in req.
896  * when transaction committed, req will be released, and lock will, too */
897 void fid_unlock(struct ptlrpc_request *req, const struct lu_fid *f,
898                 struct lustre_handle *lh, ldlm_mode_t mode, int decref)
899 {
900         {
901         /* FIXME: this is debug stuff, remove it later. */
902                 struct ldlm_lock *lock = ldlm_handle2lock(lh);
903                 if (!lock) {
904                         CERROR("invalid lock handle "LPX64, lh->cookie);
905                         LBUG();
906                 }
907                 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
908                 LDLM_LOCK_PUT(lock);
909         }
910         if (decref)
911                 ldlm_lock_decref(lh, mode);
912         else
913                 ptlrpc_save_lock(req, lh, mode);
914 }
915
916 static struct mdt_object *mdt_obj(struct lu_object *o)
917 {
918         LASSERT(lu_device_is_mdt(o->lo_dev));
919         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
920 }
921
922 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
923                                    struct mdt_device *d,
924                                    const struct lu_fid *f)
925 {
926         struct lu_object *o;
927         struct mdt_object *m;
928         ENTRY;
929
930         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
931         if (IS_ERR(o))
932                 m = (struct mdt_object *)o;
933         else
934                 m = mdt_obj(o);
935         RETURN(m);
936 }
937
938 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
939                     struct mdt_lock_handle *lh, __u64 ibits)
940 {
941         ldlm_policy_data_t *policy = &info->mti_policy;
942         struct ldlm_res_id *res_id = &info->mti_res_id;
943         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
944         int rc;
945         ENTRY;
946
947         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
948         LASSERT(lh->mlh_mode != LCK_MINMODE);
949
950         policy->l_inodebits.bits = ibits;
951
952         rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, 
953                       policy, res_id);
954         RETURN(rc);
955 }
956
957 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
958                        struct mdt_lock_handle *lh, int decref)
959 {
960         struct ptlrpc_request *req = mdt_info_req(info);
961         ENTRY;
962
963         if (lustre_handle_is_used(&lh->mlh_lh)) {
964                 fid_unlock(req, mdt_object_fid(o),
965                            &lh->mlh_lh, lh->mlh_mode, decref);
966                 lh->mlh_lh.cookie = 0;
967         }
968         EXIT;
969 }
970
971 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
972                                         const struct lu_fid *f,
973                                         struct mdt_lock_handle *lh,
974                                         __u64 ibits)
975 {
976         struct mdt_object *o;
977
978         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
979         if (!IS_ERR(o)) {
980                 int result;
981
982                 result = mdt_object_lock(info, o, lh, ibits);
983                 if (result != 0) {
984                         mdt_object_put(info->mti_ctxt, o);
985                         o = ERR_PTR(result);
986                 }
987         }
988         return o;
989 }
990
991 void mdt_object_unlock_put(struct mdt_thread_info * info,
992                            struct mdt_object * o,
993                            struct mdt_lock_handle *lh,
994                            int decref)
995 {
996         mdt_object_unlock(info, o, lh, decref);
997         mdt_object_put(info->mti_ctxt, o);
998 }
999
1000 static struct mdt_handler *mdt_handler_find(__u32 opc,
1001                                             struct mdt_opc_slice *supported)
1002 {
1003         struct mdt_opc_slice *s;
1004         struct mdt_handler   *h;
1005
1006         h = NULL;
1007         for (s = supported; s->mos_hs != NULL; s++) {
1008                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1009                         h = s->mos_hs + (opc - s->mos_opc_start);
1010                         if (h->mh_opc != 0)
1011                                 LASSERT(h->mh_opc == opc);
1012                         else
1013                                 h = NULL; /* unsupported opc */
1014                         break;
1015                 }
1016         }
1017         return h;
1018 }
1019
1020 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
1021 {
1022         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
1023 }
1024
1025 static inline __u64 req_exp_last_close_xid(struct ptlrpc_request *req)
1026 {
1027         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_close_xid);
1028 }
1029
1030 static int mdt_lock_resname_compat(struct mdt_device *m,
1031                                    struct ldlm_request *req)
1032 {
1033         /* XXX something... later. */
1034         return 0;
1035 }
1036
1037 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1038 {
1039         /* XXX something... later. */
1040         return 0;
1041 }
1042
1043 /*
1044  * Generic code handling requests that have struct mdt_body passed in:
1045  *
1046  *  - extract mdt_body from request and save it in @info, if present;
1047  *
1048  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
1049  *  @info;
1050  *
1051  *  - if HABEO_CORPUS flag is set for this request type check whether object
1052  *  actually exists on storage (lu_object_exists()).
1053  *
1054  */
1055 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1056 {
1057         const struct mdt_body   *body;
1058         struct mdt_object       *obj;
1059         const struct lu_context *ctx;
1060         struct req_capsule      *pill;
1061         int                     result;
1062
1063         ctx = info->mti_ctxt;
1064         pill = &info->mti_pill;
1065
1066         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1067         if (body != NULL) {
1068                 if (fid_is_sane(&body->fid1)) {
1069                         obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
1070                         if (!IS_ERR(obj)) {
1071                                 if ((flags & HABEO_CORPUS) &&
1072                                     !lu_object_exists(&obj->mot_obj.mo_lu)) {
1073                                         mdt_object_put(ctx, obj);
1074                                         result = -ENOENT;
1075                                 } else {
1076                                         info->mti_object = obj;
1077                                         result = 0;
1078                                 }
1079                         } else
1080                                 result = PTR_ERR(obj);
1081                 } else {
1082                         CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1083                         result = -EINVAL;
1084                 }
1085         } else
1086                 result = -EFAULT;
1087         return result;
1088 }
1089
1090 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1091 {
1092         struct req_capsule *pill;
1093         int result;
1094
1095         ENTRY;
1096         pill = &info->mti_pill;
1097
1098         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1099                 result = mdt_body_unpack(info, flags);
1100         else
1101                 result = 0;
1102
1103         if (result == 0 && (flags & HABEO_REFERO)) {
1104                 struct mdt_device       *mdt = info->mti_mdt;
1105                 /*pack reply*/
1106                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1107                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1108                                              mdt->mdt_max_mdsize);
1109                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1110                         req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1111                                              mdt->mdt_max_cookiesize);
1112
1113                 result = req_capsule_pack(pill);
1114         }
1115         RETURN(result);
1116 }
1117
1118 struct lu_context_key mdt_txn_key;
1119
1120 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
1121 {
1122         struct mdt_device     *mdt = info->mti_mdt;
1123         struct ptlrpc_request *req = mdt_info_req(info);
1124         struct obd_export     *exp = req->rq_export;
1125
1126         /* sometimes the reply message has not been successfully packed */
1127         if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1128                 return;
1129
1130         if (info->mti_trans_flags & MDT_NONEED_TRANSNO)
1131                 return;
1132         
1133         /*XXX: assert on this when all code will be finished */
1134         if (rc != 0 && info->mti_transno != 0) {
1135                 info->mti_transno = 0;
1136                 CERROR("Transno is not 0 while rc is %i!\n", rc);
1137         }
1138
1139         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1140                info->mti_transno, exp->exp_obd->obd_last_committed);
1141
1142         spin_lock(&mdt->mdt_transno_lock);
1143         req->rq_transno = info->mti_transno;
1144         lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
1145         
1146         target_committed_to_req(req);
1147         
1148         spin_unlock(&mdt->mdt_transno_lock);
1149         lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
1150         //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
1151 }
1152
1153 /*
1154  * Invoke handler for this request opc. Also do necessary preprocessing
1155  * (according to handler ->mh_flags), and post-processing (setting of
1156  * ->last_{xid,committed}).
1157  */
1158 static int mdt_req_handle(struct mdt_thread_info *info,
1159                           struct mdt_handler *h, struct ptlrpc_request *req)
1160 {
1161         int   result;
1162         __u32 flags;
1163
1164         ENTRY;
1165
1166         LASSERT(h->mh_act != NULL);
1167         LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1168         LASSERT(current->journal_info == NULL);
1169
1170         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1171
1172         if (h->mh_fail_id != 0)
1173                 MDT_FAIL_RETURN(h->mh_fail_id, 0);
1174
1175         result = 0;
1176         flags = h->mh_flags;
1177         LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1178
1179         if (h->mh_fmt != NULL) {
1180                 req_capsule_set(&info->mti_pill, h->mh_fmt);
1181                 result = mdt_unpack_req_pack_rep(info, flags);
1182         }
1183
1184         if (result == 0 && flags & MUTABOR &&
1185             req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1186                 result = -EROFS;
1187
1188         if (result == 0 && flags & HABEO_CLAVIS) {
1189                 struct ldlm_request *dlm_req;
1190
1191                 LASSERT(h->mh_fmt != NULL);
1192
1193                 dlm_req = req_capsule_client_get(&info->mti_pill,&RMF_DLM_REQ);
1194                 if (dlm_req != NULL) {
1195                         if (info->mti_mdt->mdt_opts.mo_compat_resname)
1196                                 result = mdt_lock_resname_compat(info->mti_mdt,
1197                                                                  dlm_req);
1198                         info->mti_dlm_req = dlm_req;
1199                 } else {
1200                         CERROR("Can't unpack dlm request\n");
1201                         result = -EFAULT;
1202                 }
1203         }
1204
1205         if (result == 0)
1206                 /*
1207                  * Process request.
1208                  */
1209                 result = h->mh_act(info);
1210         /*
1211          * XXX result value is unconditionally shoved into ->rq_status
1212          * (original code sometimes placed error code into ->rq_status, and
1213          * sometimes returned it to the
1214          * caller). ptlrpc_server_handle_request() doesn't check return value
1215          * anyway.
1216          */
1217         req->rq_status = result;
1218         result = 0;
1219         LASSERT(current->journal_info == NULL);
1220
1221         if (flags & HABEO_CLAVIS && info->mti_mdt->mdt_opts.mo_compat_resname){
1222                 struct ldlm_reply *dlmrep;
1223
1224                 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1225                 if (dlmrep != NULL)
1226                         result = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1227         }
1228
1229         /* If we're DISCONNECTing, the mdt_export_data is already freed */
1230
1231         if (h->mh_opc != MDS_DISCONNECT &&
1232             h->mh_opc != MDS_READPAGE &&
1233             h->mh_opc != LDLM_ENQUEUE) {
1234                 mdt_finish_reply(info, req->rq_status);
1235         }
1236         RETURN(result);
1237 }
1238
1239
1240 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1241 {
1242         lh->mlh_lh.cookie = 0ull;
1243         lh->mlh_mode = LCK_MINMODE;
1244 }
1245
1246 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1247 {
1248         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1249 }
1250
1251 static void mdt_thread_info_init(struct ptlrpc_request *req,
1252                                  struct mdt_thread_info *info)
1253 {
1254         int i;
1255
1256         memset(info, 0, sizeof(*info));
1257
1258         info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1259         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1260                 info->mti_rep_buf_size[i] = -1;
1261
1262         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1263                 mdt_lock_handle_init(&info->mti_lh[i]);
1264
1265         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1266         info->mti_ctxt = req->rq_svc_thread->t_ctx;
1267         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1268         /* it can be NULL while CONNECT */
1269         if (req->rq_export)
1270                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1271         req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1272                          info->mti_rep_buf_size);
1273 }
1274
1275 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1276 {
1277         int i;
1278
1279         req_capsule_fini(&info->mti_pill);
1280         if (info->mti_object != NULL) {
1281                 mdt_object_put(info->mti_ctxt, info->mti_object);
1282                 info->mti_object = NULL;
1283         }
1284         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1285                 mdt_lock_handle_fini(&info->mti_lh[i]);
1286 }
1287
1288 /* mds/handler.c */
1289 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1290                                        struct obd_device *obd, int *process);
1291 /*
1292  * Handle recovery. Return:
1293  *        +1: continue request processing;
1294  *       -ve: abort immediately with the given error code;
1295  *         0: send reply with error code in req->rq_status;
1296  */
1297 static int mdt_recovery(struct ptlrpc_request *req)
1298 {
1299         int recovering;
1300         int abort_recovery;
1301         struct obd_device *obd;
1302
1303         ENTRY;
1304
1305         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CONNECT)
1306                 RETURN(+1);
1307
1308         if (req->rq_export == NULL) {
1309                 CERROR("operation %d on unconnected MDS from %s\n",
1310                        lustre_msg_get_opc(req->rq_reqmsg),
1311                        libcfs_id2str(req->rq_peer));
1312                 req->rq_status = -ENOTCONN;
1313                 RETURN(-ENOTCONN);
1314         }
1315
1316         /* sanity check: if the xid matches, the request must be marked as a
1317          * resent or replayed */
1318         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req) ||
1319                       req->rq_xid == req_exp_last_close_xid(req),
1320                       lustre_msg_get_flags(req->rq_reqmsg) &
1321                       (MSG_RESENT | MSG_REPLAY)),
1322                  "rq_xid "LPU64" matches last_xid, "
1323                  "expected RESENT flag\n", req->rq_xid);
1324
1325         /* else: note the opposite is not always true; a RESENT req after a
1326          * failover will usually not match the last_xid, since it was likely
1327          * never committed. A REPLAYed request will almost never match the
1328          * last xid, however it could for a committed, but still retained,
1329          * open. */
1330
1331         obd = req->rq_export->exp_obd;
1332
1333         /* Check for aborted recovery... */
1334         spin_lock_bh(&obd->obd_processing_task_lock);
1335         abort_recovery = obd->obd_abort_recovery;
1336         recovering = obd->obd_recovering;
1337         spin_unlock_bh(&obd->obd_processing_task_lock);
1338         if (abort_recovery) {
1339                 target_abort_recovery(obd);
1340         } else if (recovering) {
1341                 int rc;
1342                 int should_process;
1343
1344                 rc = mds_filter_recovery_request(req, obd, &should_process);
1345                 if (rc != 0 || !should_process) {
1346                         RETURN(rc);
1347                 }
1348         }
1349         RETURN(+1);
1350 }
1351
1352 static int mdt_reply(struct ptlrpc_request *req, int result,
1353                      struct mdt_thread_info *info)
1354 {
1355         struct obd_device *obd;
1356         ENTRY;
1357
1358         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1359                 if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1360                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1361
1362                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1363                 if (obd && obd->obd_recovering) {
1364                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1365                         RETURN(target_queue_final_reply(req, result));
1366                 } else {
1367                         /* Lost a race with recovery; let the error path
1368                          * DTRT. */
1369                         result = req->rq_status = -ENOTCONN;
1370                 }
1371         }
1372         target_send_reply(req, result, info->mti_fail_id);
1373         RETURN(0);
1374 }
1375
1376 /* mds/handler.c */
1377 extern int mds_msg_check_version(struct lustre_msg *msg);
1378
1379 static int mdt_handle0(struct ptlrpc_request *req, 
1380                        struct mdt_thread_info *info,
1381                        struct mdt_opc_slice *supported)
1382 {
1383         struct mdt_handler *h;
1384         struct lustre_msg  *msg;
1385         int                 result;
1386
1387         ENTRY;
1388
1389         MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1390
1391         LASSERT(current->journal_info == NULL);
1392
1393         msg = req->rq_reqmsg;
1394         result = mds_msg_check_version(msg);
1395         if (result == 0) {
1396                 result = mdt_recovery(req);
1397                 switch (result) {
1398                 case +1:
1399                         h = mdt_handler_find(lustre_msg_get_opc(msg), 
1400                                              supported);
1401                         if (h != NULL)
1402                                 result = mdt_req_handle(info, h, req);
1403                         else {
1404                                 req->rq_status = -ENOTSUPP;
1405                                 result = ptlrpc_error(req);
1406                                 break;
1407                         }
1408                         /* fall through */
1409                 case 0:
1410                         result = mdt_reply(req, result, info);
1411                 }
1412         } else
1413                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
1414         RETURN(result);
1415 }
1416
1417 /*
1418  * MDT handler function called by ptlrpc service thread when request comes.
1419  *
1420  * XXX common "target" functionality should be factored into separate module
1421  * shared by mdt, ost and stand-alone services like fld.
1422  */
1423 static int mdt_handle_common(struct ptlrpc_request *req,
1424                              struct mdt_opc_slice *supported)
1425 {
1426         struct lu_context      *ctx;
1427         struct mdt_thread_info *info;
1428         int                     result;
1429         ENTRY;
1430
1431         ctx = req->rq_svc_thread->t_ctx;
1432         LASSERT(ctx != NULL);
1433         LASSERT(ctx->lc_thread == req->rq_svc_thread);
1434         info = lu_context_key_get(ctx, &mdt_thread_key);
1435         LASSERT(info != NULL);
1436
1437         mdt_thread_info_init(req, info);
1438
1439         result = mdt_handle0(req, info, supported);
1440
1441         mdt_thread_info_fini(info);
1442         RETURN(result);
1443 }
1444
1445 static int mdt_regular_handle(struct ptlrpc_request *req)
1446 {
1447         return mdt_handle_common(req, mdt_regular_handlers);
1448 }
1449
1450 static int mdt_readpage_handle(struct ptlrpc_request *req)
1451 {
1452         return mdt_handle_common(req, mdt_readpage_handlers);
1453 }
1454
1455 enum mdt_it_code {
1456         MDT_IT_OPEN,
1457         MDT_IT_OCREAT,
1458         MDT_IT_CREATE,
1459         MDT_IT_GETATTR,
1460         MDT_IT_READDIR,
1461         MDT_IT_LOOKUP,
1462         MDT_IT_UNLINK,
1463         MDT_IT_TRUNC,
1464         MDT_IT_GETXATTR,
1465         MDT_IT_NR
1466 };
1467
1468 static int mdt_intent_getattr(enum mdt_it_code opcode,
1469                               struct mdt_thread_info *info,
1470                               struct ldlm_lock **,
1471                               int);
1472 static int mdt_intent_reint(enum mdt_it_code opcode,
1473                             struct mdt_thread_info *info,
1474                             struct ldlm_lock **,
1475                             int);
1476
1477 static struct mdt_it_flavor {
1478         const struct req_format *it_fmt;
1479         __u32                    it_flags;
1480         int                    (*it_act)(enum mdt_it_code ,
1481                                          struct mdt_thread_info *,
1482                                          struct ldlm_lock **,
1483                                          int);
1484         long                     it_reint;
1485 } mdt_it_flavor[] = {
1486         [MDT_IT_OPEN]     = {
1487                 .it_fmt   = &RQF_LDLM_INTENT,
1488                 /*.it_flags = HABEO_REFERO,*/
1489                 .it_flags = 0,
1490                 .it_act   = mdt_intent_reint,
1491                 .it_reint = REINT_OPEN
1492         },
1493         [MDT_IT_OCREAT]   = {
1494                 .it_fmt   = &RQF_LDLM_INTENT,
1495                 .it_flags = MUTABOR,
1496                 .it_act   = mdt_intent_reint,
1497                 .it_reint = REINT_OPEN
1498         },
1499         [MDT_IT_CREATE]   = {
1500                 .it_fmt   = &RQF_LDLM_INTENT,
1501                 .it_flags = MUTABOR,
1502                 .it_act   = mdt_intent_reint,
1503                 .it_reint = REINT_CREATE
1504         },
1505         [MDT_IT_GETATTR]  = {
1506                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1507                 .it_flags = HABEO_REFERO,
1508                 .it_act   = mdt_intent_getattr
1509         },
1510         [MDT_IT_READDIR]  = {
1511                 .it_fmt   = NULL,
1512                 .it_flags = 0,
1513                 .it_act   = NULL
1514         },
1515         [MDT_IT_LOOKUP]   = {
1516                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1517                 .it_flags = HABEO_REFERO,
1518                 .it_act   = mdt_intent_getattr
1519         },
1520         [MDT_IT_UNLINK]   = {
1521                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
1522                 .it_flags = MUTABOR,
1523                 .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
1524                 .it_reint = REINT_UNLINK
1525         },
1526         [MDT_IT_TRUNC]    = {
1527                 .it_fmt   = NULL,
1528                 .it_flags = MUTABOR,
1529                 .it_act   = NULL
1530         },
1531         [MDT_IT_GETXATTR] = {
1532                 .it_fmt   = NULL,
1533                 .it_flags = 0,
1534                 .it_act   = NULL
1535         }
1536 };
1537
1538 static int mdt_intent_getattr(enum mdt_it_code opcode,
1539                               struct mdt_thread_info *info,
1540                               struct ldlm_lock **lockp,
1541                               int flags)
1542 {
1543         struct ldlm_lock       *old_lock = *lockp;
1544         struct ldlm_lock       *new_lock = NULL;
1545         struct ptlrpc_request  *req = mdt_info_req(info);
1546         struct ldlm_reply      *ldlm_rep;
1547         struct mdt_lock_handle  tmp_lock;
1548         struct mdt_lock_handle *lhc = &tmp_lock;
1549         struct mdt_device      *mdt = info->mti_mdt;
1550         __u64                   child_bits;
1551         int                     rc;
1552
1553         ENTRY;
1554
1555         switch (opcode) {
1556         case MDT_IT_LOOKUP:
1557                 child_bits = MDS_INODELOCK_LOOKUP;
1558                 break;
1559         case MDT_IT_GETATTR:
1560                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1561                 break;
1562         default:
1563                 CERROR("Unhandled till now");
1564                 RETURN(-EINVAL);
1565         }
1566
1567         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1568         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
1569
1570         ldlm_rep->lock_policy_res2 =
1571                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
1572         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
1573
1574         if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
1575                 ldlm_rep->lock_policy_res2 = 0;
1576         if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
1577                     ldlm_rep->lock_policy_res2) {
1578                 RETURN(ELDLM_LOCK_ABORTED);
1579         }
1580
1581         new_lock = ldlm_handle2lock(&lhc->mlh_lh);
1582         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1583                 RETURN(0);
1584
1585         LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
1586                  opcode, lhc->mlh_lh.cookie);
1587
1588         *lockp = new_lock;
1589
1590         /* FIXME:This only happens when MDT can handle RESENT */
1591         if (new_lock->l_export == req->rq_export) {
1592                 /* Already gave this to the client, which means that we
1593                  * reconstructed a reply. */
1594                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1595                         MSG_RESENT);
1596                 RETURN(ELDLM_LOCK_REPLACED);
1597         }
1598
1599         /* TODO:
1600          * These are copied from mds/hander.c, and should be factored into
1601          * ldlm module in order to share these code, and be easy for merge.
1602          */
1603
1604         /* Fixup the lock to be given to the client */
1605         lock_res_and_lock(new_lock);
1606         new_lock->l_readers = 0;
1607         new_lock->l_writers = 0;
1608
1609         new_lock->l_export = class_export_get(req->rq_export);
1610         list_add(&new_lock->l_export_chain,
1611                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
1612
1613         new_lock->l_blocking_ast = old_lock->l_blocking_ast;
1614         new_lock->l_completion_ast = old_lock->l_completion_ast;
1615
1616         new_lock->l_remote_handle = old_lock->l_remote_handle;
1617
1618         new_lock->l_flags &= ~LDLM_FL_LOCAL;
1619
1620         unlock_res_and_lock(new_lock);
1621         LDLM_LOCK_PUT(new_lock);
1622
1623         RETURN(ELDLM_LOCK_REPLACED);
1624 }
1625
1626 static int mdt_intent_reint(enum mdt_it_code opcode,
1627                             struct mdt_thread_info *info,
1628                             struct ldlm_lock **lockp,
1629                             int flags)
1630 {
1631         long opc;
1632         int rc;
1633         struct ldlm_reply *rep;
1634
1635         static const struct req_format *intent_fmts[REINT_MAX] = {
1636                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
1637                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
1638         };
1639
1640         ENTRY;
1641
1642         opc = mdt_reint_opcode(info, intent_fmts);
1643         if (opc < 0)
1644                 RETURN(opc);
1645
1646         if (mdt_it_flavor[opcode].it_reint != opc) {
1647                 CERROR("Reint code %ld doesn't match intent: %d\n",
1648                        opc, opcode);
1649                 RETURN(-EPROTO);
1650         }
1651
1652         rc = mdt_reint_internal(info, opc);
1653
1654         rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1655         if (rep == NULL)
1656                 RETURN(-EFAULT);
1657         rep->lock_policy_res2 = rc;
1658
1659         mdt_set_disposition(info, rep, DISP_IT_EXECD);
1660         
1661         mdt_finish_reply(info, rc);
1662
1663         RETURN(ELDLM_LOCK_ABORTED);
1664 }
1665
1666 static int mdt_intent_code(long itcode)
1667 {
1668         int result;
1669
1670         switch(itcode) {
1671         case IT_OPEN:
1672                 result = MDT_IT_OPEN;
1673                 break;
1674         case IT_OPEN|IT_CREAT:
1675                 result = MDT_IT_OCREAT;
1676                 break;
1677         case IT_CREAT:
1678                 result = MDT_IT_CREATE;
1679                 break;
1680         case IT_READDIR:
1681                 result = MDT_IT_READDIR;
1682                 break;
1683         case IT_GETATTR:
1684                 result = MDT_IT_GETATTR;
1685                 break;
1686         case IT_LOOKUP:
1687                 result = MDT_IT_LOOKUP;
1688                 break;
1689         case IT_UNLINK:
1690                 result = MDT_IT_UNLINK;
1691                 break;
1692         case IT_TRUNC:
1693                 result = MDT_IT_TRUNC;
1694                 break;
1695         case IT_GETXATTR:
1696                 result = MDT_IT_GETXATTR;
1697                 break;
1698         default:
1699                 CERROR("Unknown intent opcode: %ld\n", itcode);
1700                 result = -EINVAL;
1701                 break;
1702         }
1703         return result;
1704 }
1705
1706 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
1707                           struct ldlm_lock **lockp, int flags)
1708 {
1709         struct req_capsule   *pill;
1710         struct mdt_it_flavor *flv;
1711         int opc;
1712         int rc;
1713         ENTRY;
1714
1715         opc = mdt_intent_code(itopc);
1716         if (opc < 0)
1717                 RETURN(-EINVAL);
1718
1719         pill = &info->mti_pill;
1720         flv  = &mdt_it_flavor[opc];
1721
1722         if (flv->it_fmt != NULL)
1723                 req_capsule_extend(pill, flv->it_fmt);
1724
1725         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
1726         if (rc == 0) {
1727                 struct ptlrpc_request *req = mdt_info_req(info);
1728                 if (flv->it_flags & MUTABOR &&
1729                     req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1730                         rc = -EROFS;
1731         }
1732         if (rc == 0 && flv->it_act != NULL) {
1733                 /* execute policy */
1734                 rc = flv->it_act(opc, info, lockp, flags);
1735         } else
1736                 rc = -EOPNOTSUPP;
1737         RETURN(rc);
1738 }
1739
1740 static int mdt_intent_policy(struct ldlm_namespace *ns,
1741                              struct ldlm_lock **lockp, void *req_cookie,
1742                              ldlm_mode_t mode, int flags, void *data)
1743 {
1744         struct mdt_thread_info *info;
1745         struct ptlrpc_request  *req  =  req_cookie;
1746         struct ldlm_intent     *it;
1747         struct req_capsule     *pill;
1748         struct ldlm_lock       *lock = *lockp;
1749         int rc;
1750
1751         ENTRY;
1752
1753         LASSERT(req != NULL);
1754
1755         info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1756         LASSERT(info != NULL);
1757         pill = &info->mti_pill;
1758         LASSERT(pill->rc_req == req);
1759
1760         if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
1761                 req_capsule_extend(pill, &RQF_LDLM_INTENT);
1762                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
1763                 if (it != NULL) {
1764                         LDLM_DEBUG(lock, "intent policy opc: %s",
1765                                    ldlm_it2str(it->opc));
1766
1767                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
1768                         if (rc == 0)
1769                                 rc = ELDLM_OK;
1770                 } else
1771                         rc = -EFAULT;
1772         } else {
1773                 /* No intent was provided */
1774                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
1775                 rc = req_capsule_pack(pill);
1776         }
1777         RETURN(rc);
1778 }
1779
1780 /*
1781  * Seq wrappers
1782  */
1783 static int mdt_seq_fini(const struct lu_context *ctx,
1784                         struct mdt_device *m)
1785 {
1786         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1787         ENTRY;
1788
1789         if (ls && ls->ls_server_seq) {
1790                 seq_server_fini(ls->ls_server_seq, ctx);
1791                 OBD_FREE_PTR(ls->ls_server_seq);
1792                 ls->ls_server_seq = NULL;
1793         }
1794         if (ls && ls->ls_control_seq) {
1795                 seq_server_fini(ls->ls_control_seq, ctx);
1796                 OBD_FREE_PTR(ls->ls_control_seq);
1797                 ls->ls_control_seq = NULL;
1798         }
1799         RETURN(0);
1800 }
1801
1802 static int mdt_seq_init(const struct lu_context *ctx,
1803                         const char *uuid,
1804                         struct mdt_device *m)
1805 {
1806         struct lu_site *ls;
1807         int rc;
1808         ENTRY;
1809
1810         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1811
1812         /* sequence-controller node */
1813         if (ls->ls_node_id == 0) {
1814                 LASSERT(ls->ls_control_seq == NULL);
1815                 OBD_ALLOC_PTR(ls->ls_control_seq);
1816
1817                 if (ls->ls_control_seq != NULL) {
1818                         rc = seq_server_init(ls->ls_control_seq,
1819                                              m->mdt_bottom, uuid,
1820                                              LUSTRE_SEQ_CONTROLLER,
1821                                              ctx);
1822                 } else
1823                         rc = -ENOMEM;
1824         }
1825
1826         LASSERT(ls->ls_server_seq == NULL);
1827         OBD_ALLOC_PTR(ls->ls_server_seq);
1828
1829         if (ls->ls_server_seq != NULL) {
1830                 rc = seq_server_init(ls->ls_server_seq,
1831                                      m->mdt_bottom, uuid,
1832                                      LUSTRE_SEQ_SERVER,
1833                                      ctx);
1834         } else
1835                 rc = -ENOMEM;
1836
1837         if (rc)
1838                 mdt_seq_fini(ctx, m);
1839
1840         RETURN(rc);
1841 }
1842
1843 /*
1844  * Init client sequence manager which is used by local MDS to talk to sequence
1845  * controller on remote node.
1846  */
1847 static int mdt_seq_init_cli(const struct lu_context *ctx,
1848                             struct mdt_device *m,
1849                             struct lustre_cfg *cfg)
1850 {
1851         struct lu_site    *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1852         struct obd_device *mdc;
1853         struct obd_uuid   *uuidp;
1854         char              *uuid_str;
1855         int               rc;
1856         int               index;
1857         struct mdt_thread_info *info;
1858         char *p, *index_string = lustre_cfg_string(cfg, 2);
1859         ENTRY;
1860
1861         info = lu_context_key_get(ctx, &mdt_thread_key);
1862         uuidp = &info->mti_u.uuid;
1863
1864         LASSERT(index_string);
1865
1866         index = simple_strtol(index_string, &p, 10);
1867         if (*p) {
1868                 CERROR("Invalid index in lustre_cgf, offset 2\n");
1869                 RETURN(-EINVAL);
1870         }
1871
1872         /* check if this is first MDC add and controller is not yet
1873          * initialized. */
1874         if (index != 0 || ls->ls_client_exp)
1875                 RETURN(0);
1876
1877         uuid_str = lustre_cfg_string(cfg, 1);
1878         obd_str2uuid(uuidp, uuid_str);
1879         mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, NULL);
1880         if (!mdc) {
1881                 CERROR("can't find controller MDC by uuid %s\n",
1882                        uuid_str);
1883                 rc = -ENOENT;
1884         } else if (!mdc->obd_set_up) {
1885                 CERROR("target %s not set up\n", mdc->obd_name);
1886                 rc = -EINVAL;
1887         } else {
1888                 struct lustre_handle conn = {0, };
1889
1890                 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
1891                        mdc->obd_name, mdc->obd_uuid.uuid);
1892
1893                 rc = obd_connect(ctx, &conn, mdc, &mdc->obd_uuid, NULL);
1894
1895                 if (rc) {
1896                         CERROR("target %s connect error %d\n",
1897                                mdc->obd_name, rc);
1898                 } else {
1899                         ls->ls_client_exp = class_conn2export(&conn);
1900
1901                         OBD_ALLOC_PTR(ls->ls_client_seq);
1902
1903                         if (ls->ls_client_seq != NULL) {
1904                                 rc = seq_client_init(ls->ls_client_seq,
1905                                                      mdc->obd_name,
1906                                                      ls->ls_client_exp,
1907                                                      LUSTRE_SEQ_METADATA);
1908                         } else
1909                                 rc = -ENOMEM;
1910
1911                         if (rc)
1912                                 RETURN(rc);
1913                         /*FIXME: add client seq to mdc obd for 
1914                          *allocating fid in create slave objects,
1915                          *may need better way to fix it,
1916                          *why not init client seq in cmm_add_mdc?*/
1917                         mdc->u.cli.cl_seq = ls->ls_client_seq;
1918
1919                         LASSERT(ls->ls_server_seq != NULL);
1920
1921                         rc = seq_server_set_cli(ls->ls_server_seq,
1922                                                 ls->ls_client_seq,
1923                                                 ctx);
1924                 }
1925         }
1926
1927         RETURN(rc);
1928 }
1929
1930 static void mdt_seq_fini_cli(struct mdt_device *m)
1931 {
1932         struct lu_site *ls;
1933
1934         ENTRY;
1935
1936         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1937
1938         if (ls && ls->ls_server_seq)
1939                 seq_server_set_cli(ls->ls_server_seq,
1940                                    NULL, NULL);
1941
1942         if (ls && ls->ls_client_seq) {
1943                 seq_client_fini(ls->ls_client_seq);
1944                 OBD_FREE_PTR(ls->ls_client_seq);
1945                 ls->ls_client_seq = NULL;
1946         }
1947
1948         if (ls && ls->ls_client_exp) {
1949                 int rc = obd_disconnect(ls->ls_client_exp);
1950                 ls->ls_client_exp = NULL;
1951
1952                 if (rc) {
1953                         CERROR("failure to disconnect "
1954                                "obd: %d\n", rc);
1955                 }
1956         }
1957         EXIT;
1958 }
1959
1960 /*
1961  * FLD wrappers
1962  */
1963 static int mdt_fld_init(const struct lu_context *ctx,
1964                         const char *uuid,
1965                         struct mdt_device *m)
1966 {
1967         struct lu_site *ls;
1968         int rc;
1969         ENTRY;
1970
1971         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1972
1973         OBD_ALLOC_PTR(ls->ls_server_fld);
1974
1975         if (ls->ls_server_fld != NULL) {
1976                 rc = fld_server_init(ls->ls_server_fld, ctx,
1977                                      m->mdt_bottom, uuid);
1978                 if (rc) {
1979                         OBD_FREE_PTR(ls->ls_server_fld);
1980                         ls->ls_server_fld = NULL;
1981                 }
1982         } else
1983                 rc = -ENOMEM;
1984
1985         RETURN(rc);
1986 }
1987
1988 static int mdt_fld_fini(const struct lu_context *ctx,
1989                         struct mdt_device *m)
1990 {
1991         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1992         ENTRY;
1993
1994         if (ls && ls->ls_server_fld) {
1995                 fld_server_fini(ls->ls_server_fld, ctx);
1996                 OBD_FREE_PTR(ls->ls_server_fld);
1997                 ls->ls_server_fld = NULL;
1998         }
1999         RETURN(0);
2000 }
2001
2002 /* device init/fini methods */
2003 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2004 {
2005         if (m->mdt_service != NULL) {
2006                 ptlrpc_unregister_service(m->mdt_service);
2007                 m->mdt_service = NULL;
2008         }
2009         if (m->mdt_readpage_service != NULL) {
2010                 ptlrpc_unregister_service(m->mdt_readpage_service);
2011                 m->mdt_readpage_service = NULL;
2012         }
2013         if (m->mdt_setattr_service != NULL) {
2014                 ptlrpc_unregister_service(m->mdt_setattr_service);
2015                 m->mdt_setattr_service = NULL;
2016         }
2017 }
2018
2019 static int mdt_start_ptlrpc_service(struct mdt_device *m)
2020 {
2021         int rc;
2022         static struct ptlrpc_service_conf conf;
2023         ENTRY;
2024
2025         conf = (typeof(conf)) {
2026                 .psc_nbufs            = MDS_NBUFS,
2027                 .psc_bufsize          = MDS_BUFSIZE,
2028                 .psc_max_req_size     = MDS_MAXREQSIZE,
2029                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2030                 .psc_req_portal       = MDS_REQUEST_PORTAL,
2031                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2032                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2033                 /*
2034                  * We'd like to have a mechanism to set this on a per-device
2035                  * basis, but alas...
2036                  */
2037                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2038                                        MDT_MAX_THREADS),
2039                 .psc_ctx_tags      = LCT_MD_THREAD
2040         };
2041
2042         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2043         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2044                            "mdt_ldlm_client", m->mdt_ldlm_client);
2045
2046         m->mdt_service =
2047                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT0_NAME,
2048                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2049                                      NULL);
2050         if (m->mdt_service == NULL)
2051                 RETURN(-ENOMEM);
2052
2053         rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
2054         if (rc)
2055                 GOTO(err_mdt_svc, rc);
2056
2057         /*
2058          * readpage service configuration. Parameters have to be adjusted,
2059          * ideally.
2060          */
2061         conf = (typeof(conf)) {
2062                 .psc_nbufs            = MDS_NBUFS,
2063                 .psc_bufsize          = MDS_BUFSIZE,
2064                 .psc_max_req_size     = MDS_MAXREQSIZE,
2065                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2066                 .psc_req_portal       = MDS_READPAGE_PORTAL,
2067                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2068                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2069                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2070                                        MDT_MAX_THREADS),
2071                 .psc_ctx_tags      = LCT_MD_THREAD
2072         };
2073         m->mdt_readpage_service =
2074                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2075                                      LUSTRE_MDT0_NAME "_readpage",
2076                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2077                                      NULL);
2078
2079         if (m->mdt_readpage_service == NULL) {
2080                 CERROR("failed to start readpage service\n");
2081                 GOTO(err_mdt_svc, rc = -ENOMEM);
2082         }
2083
2084         rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2085
2086         /*
2087          * setattr service configuration.
2088          */
2089         conf = (typeof(conf)) {
2090                 .psc_nbufs            = MDS_NBUFS,
2091                 .psc_bufsize          = MDS_BUFSIZE,
2092                 .psc_max_req_size     = MDS_MAXREQSIZE,
2093                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2094                 .psc_req_portal       = MDS_SETATTR_PORTAL,
2095                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2096                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2097                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2098                                        MDT_MAX_THREADS),
2099                 .psc_ctx_tags      = LCT_MD_THREAD
2100         };
2101
2102         m->mdt_setattr_service =
2103                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
2104                                      LUSTRE_MDT0_NAME "_setattr",
2105                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2106                                      NULL);
2107
2108         if (!m->mdt_setattr_service) {
2109                 CERROR("failed to start setattr service\n");
2110                 GOTO(err_mdt_svc, rc = -ENOMEM);
2111         }
2112
2113         rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2114         if (rc)
2115                 GOTO(err_mdt_svc, rc);
2116
2117         EXIT;
2118 err_mdt_svc:
2119         if (rc)
2120                 mdt_stop_ptlrpc_service(m);
2121
2122         return rc;
2123 }
2124
2125 static void mdt_stack_fini(const struct lu_context *ctx,
2126                            struct mdt_device *m, struct lu_device *top)
2127 {
2128         struct lu_device        *d = top, *n;
2129         struct lustre_cfg_bufs  *bufs;
2130         struct lustre_cfg       *lcfg;
2131         struct mdt_thread_info  *info;
2132         ENTRY;
2133
2134         info = lu_context_key_get(ctx, &mdt_thread_key);
2135         LASSERT(info != NULL);
2136
2137         bufs = &info->mti_u.bufs;
2138         /* process cleanup */
2139         lustre_cfg_bufs_reset(bufs, NULL);
2140         lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
2141         if (!lcfg) {
2142                 CERROR("Cannot alloc lcfg!\n");
2143                 return;
2144         }
2145         top->ld_ops->ldo_process_config(ctx, top, lcfg);
2146
2147         lu_site_purge(ctx, top->ld_site, ~0);
2148         while (d != NULL) {
2149                 struct obd_type *type;
2150                 struct lu_device_type *ldt = d->ld_type;
2151
2152                 lu_device_put(d);
2153
2154                 /* each fini() returns next device in stack of layers
2155                  * * so we can avoid the recursion */
2156                 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
2157                 ldt->ldt_ops->ldto_device_free(ctx, d);
2158                 type = ldt->ldt_obd_type;
2159                 type->typ_refcnt--;
2160                 class_put_type(type);
2161                 /* switch to the next device in the layer */
2162                 d = n;
2163         }
2164         m->mdt_child = NULL;
2165 }
2166
2167 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
2168                                          const char *typename,
2169                                          struct lu_device *child,
2170                                          struct lustre_cfg *cfg)
2171 {
2172         struct obd_type       *type;
2173         struct lu_device_type *ldt;
2174         struct lu_device      *d;
2175         int rc;
2176
2177         /* find the type */
2178         type = class_get_type(typename);
2179         if (!type) {
2180                 CERROR("Unknown type: '%s'\n", typename);
2181                 GOTO(out, rc = -ENODEV);
2182         }
2183
2184         ldt = type->typ_lu;
2185         if (ldt == NULL) {
2186                 CERROR("type: '%s'\n", typename);
2187                 GOTO(out_type, rc = -EINVAL);
2188         }
2189
2190         ldt->ldt_obd_type = type;
2191         d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
2192         if (IS_ERR(d)) {
2193                 CERROR("Cannot allocate device: '%s'\n", typename);
2194                 GOTO(out_type, rc = -ENODEV);
2195         }
2196
2197         LASSERT(child->ld_site);
2198         d->ld_site = child->ld_site;
2199
2200         type->typ_refcnt++;
2201         rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
2202         if (rc) {
2203                 CERROR("can't init device '%s', rc %d\n", typename, rc);
2204                 GOTO(out_alloc, rc);
2205         }
2206         lu_device_get(d);
2207
2208         RETURN(d);
2209 out_alloc:
2210         ldt->ldt_ops->ldto_device_free(ctx, d);
2211         type->typ_refcnt--;
2212 out_type:
2213         class_put_type(type);
2214 out:
2215         return ERR_PTR(rc);
2216 }
2217
2218 static int mdt_stack_init(const struct lu_context *ctx,
2219                           struct mdt_device *m, struct lustre_cfg *cfg)
2220 {
2221         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
2222         struct lu_device  *tmp;
2223         struct md_device *md;
2224         int rc;
2225         ENTRY;
2226
2227         /* init the stack */
2228         tmp = mdt_layer_setup(ctx, LUSTRE_OSD0_NAME, d, cfg);
2229         if (IS_ERR(tmp)) {
2230                 RETURN(PTR_ERR(tmp));
2231         }
2232         m->mdt_bottom = lu2dt_dev(tmp);
2233         d = tmp;
2234         tmp = mdt_layer_setup(ctx, LUSTRE_MDD0_NAME, d, cfg);
2235         if (IS_ERR(tmp)) {
2236                 GOTO(out, rc = PTR_ERR(tmp));
2237         }
2238         d = tmp;
2239         md = lu2md_dev(d);
2240
2241         tmp = mdt_layer_setup(ctx, LUSTRE_CMM0_NAME, d, cfg);
2242         if (IS_ERR(tmp)) {
2243                 GOTO(out, rc = PTR_ERR(tmp));
2244         }
2245         d = tmp;
2246         /*set mdd upcall device*/
2247         md->md_upcall.mu_upcall_dev = lu2md_dev(d);
2248
2249         md = lu2md_dev(d);
2250         /*set cmm upcall device*/
2251         md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
2252
2253         m->mdt_child = lu2md_dev(d);
2254
2255         /* process setup config */
2256         tmp = &m->mdt_md_dev.md_lu_dev;
2257         rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
2258         GOTO(out, rc);
2259 out:
2260         /* fini from last known good lu_device */
2261         if (rc)
2262                 mdt_stack_fini(ctx, m, d);
2263
2264         return rc;
2265 }
2266
2267 static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
2268 {
2269         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2270         struct lu_site   *ls = d->ld_site;
2271
2272         ENTRY;
2273
2274         mdt_fs_cleanup(ctx, m);
2275         ping_evictor_stop();
2276         mdt_stop_ptlrpc_service(m);
2277
2278         mdt_seq_fini(ctx, m);
2279         mdt_seq_fini_cli(m);
2280         
2281         mdt_fld_fini(ctx, m);
2282
2283         /* finish the stack */
2284         mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2285
2286         if (m->mdt_namespace != NULL) {
2287                 ldlm_namespace_free(m->mdt_namespace, 0);
2288                 m->mdt_namespace = NULL;
2289         }
2290
2291         if (ls) {
2292                 lu_site_fini(ls);
2293                 OBD_FREE_PTR(ls);
2294         }
2295         LASSERT(atomic_read(&d->ld_ref) == 0);
2296         md_device_fini(&m->mdt_md_dev);
2297
2298         EXIT;
2299 }
2300
2301 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
2302                      struct lu_device_type *t, struct lustre_cfg *cfg)
2303 {
2304         struct mdt_thread_info *info;
2305         struct obd_device      *obd;
2306         const char             *dev = lustre_cfg_string(cfg, 0);
2307         const char             *num = lustre_cfg_string(cfg, 2);
2308         struct lu_site         *s;
2309         int                     rc;
2310         ENTRY;
2311
2312         info = lu_context_key_get(ctx, &mdt_thread_key);
2313         LASSERT(info != NULL);
2314
2315         obd = class_name2obd(dev);
2316         LASSERT(obd);
2317       
2318         spin_lock_init(&m->mdt_transno_lock);
2319 #if 0
2320         /* FIXME: We need to load them from disk. But now fake it */
2321         m->mdt_last_transno = 1;
2322         m->mdt_last_committed = 1;
2323 #endif
2324         m->mdt_max_mdsize = MAX_MD_SIZE;
2325         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
2326
2327         spin_lock_init(&m->mdt_epoch_lock);
2328         /* Temporary. should parse mount option. */
2329         m->mdt_opts.mo_user_xattr = 0;
2330         m->mdt_opts.mo_acl = 0;
2331         m->mdt_opts.mo_compat_resname = 0;
2332         obd->obd_replayable = 1;
2333
2334
2335         OBD_ALLOC_PTR(s);
2336         if (s == NULL)
2337                 RETURN(-ENOMEM);
2338
2339         md_device_init(&m->mdt_md_dev, t);
2340         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2341         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2342
2343         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2344         if (rc) {
2345                 CERROR("can't init lu_site, rc %d\n", rc);
2346                 GOTO(err_free_site, rc);
2347         }
2348
2349         /* init the stack */
2350         rc = mdt_stack_init(ctx, m, cfg);
2351         if (rc) {
2352                 CERROR("can't init device stack, rc %d\n", rc);
2353                 GOTO(err_fini_site, rc);
2354         }
2355         /* set server index */
2356         LASSERT(num);
2357         s->ls_node_id = simple_strtol(num, NULL, 10);
2358
2359         rc = mdt_fld_init(ctx, obd->obd_name, m);
2360         if (rc)
2361                 GOTO(err_fini_stack, rc);
2362
2363         rc = mdt_seq_init(ctx, obd->obd_name, m);
2364         if (rc)
2365                 GOTO(err_fini_fld, rc);
2366
2367         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
2368                  LUSTRE_MDT0_NAME"-%p", m);
2369         m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
2370                                               LDLM_NAMESPACE_SERVER);
2371         if (m->mdt_namespace == NULL)
2372                 GOTO(err_fini_seq, rc = -ENOMEM);
2373
2374         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
2375
2376         rc = mdt_start_ptlrpc_service(m);
2377         if (rc)
2378                 GOTO(err_free_ns, rc);
2379
2380         ping_evictor_start();
2381         rc = mdt_fs_setup(ctx, m);
2382         if (rc)
2383                 GOTO(err_stop_service, rc);
2384         RETURN(0);
2385
2386 err_stop_service:
2387         mdt_stop_ptlrpc_service(m);
2388 err_free_ns:
2389         ldlm_namespace_free(m->mdt_namespace, 0);
2390         m->mdt_namespace = NULL;
2391 err_fini_seq:
2392         mdt_seq_fini(ctx, m);
2393 err_fini_fld:
2394         mdt_fld_fini(ctx, m);
2395 err_fini_stack:
2396         mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2397 err_fini_site:
2398         lu_site_fini(s);
2399 err_free_site:
2400         OBD_FREE_PTR(s);
2401         
2402         md_device_fini(&m->mdt_md_dev);
2403         return (rc);
2404 }
2405
2406 /* used by MGS to process specific configurations */
2407 static int mdt_process_config(const struct lu_context *ctx,
2408                               struct lu_device *d, struct lustre_cfg *cfg)
2409 {
2410         struct mdt_device *m = mdt_dev(d);
2411         struct md_device *md_next  = m->mdt_child;
2412         struct lu_device *next = md2lu_dev(md_next);
2413         int err;
2414         ENTRY;
2415
2416         switch (cfg->lcfg_command) {
2417         case LCFG_ADD_MDC:
2418                 /*
2419                  * Add mdc hook to get first MDT uuid and connect it to
2420                  * ls->controller to use for seq manager.
2421                  */
2422                 err = mdt_seq_init_cli(ctx, mdt_dev(d), cfg);
2423                 if (err) {
2424                         CERROR("can't initialize controller export, "
2425                                "rc %d\n", err);
2426                 }
2427         default:
2428                 /* others are passed further */
2429                 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
2430                 break;
2431         }
2432         RETURN(err);
2433 }
2434
2435 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
2436                                           const struct lu_object_header *hdr,
2437                                           struct lu_device *d)
2438 {
2439         struct mdt_object *mo;
2440
2441         ENTRY;
2442
2443         OBD_ALLOC_PTR(mo);
2444         if (mo != NULL) {
2445                 struct lu_object *o;
2446                 struct lu_object_header *h;
2447
2448                 o = &mo->mot_obj.mo_lu;
2449                 h = &mo->mot_header;
2450                 lu_object_header_init(h);
2451                 lu_object_init(o, h, d);
2452                 lu_object_add_top(h, o);
2453                 o->lo_ops = &mdt_obj_ops;
2454                 RETURN(o);
2455         } else
2456                 RETURN(NULL);
2457 }
2458
2459 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
2460 {
2461         struct mdt_device *d = mdt_dev(o->lo_dev);
2462         struct lu_device  *under;
2463         struct lu_object  *below;
2464         int                rc = 0;
2465         ENTRY;
2466
2467         CDEBUG(D_INODE, "object init, fid = "DFID"\n",
2468                PFID(lu_object_fid(o)));
2469
2470         under = &d->mdt_child->md_lu_dev;
2471         below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
2472         if (below != NULL) {
2473                 lu_object_add(o, below);
2474         } else
2475                 rc = -ENOMEM;
2476         RETURN(rc);
2477 }
2478
2479 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
2480 {
2481         struct mdt_object *mo = mdt_obj(o);
2482         struct lu_object_header *h;
2483         ENTRY;
2484
2485         h = o->lo_header;
2486         CDEBUG(D_INODE, "object free, fid = "DFID"\n",
2487                PFID(lu_object_fid(o)));
2488
2489         lu_object_fini(o);
2490         lu_object_header_fini(h);
2491         OBD_FREE_PTR(mo);
2492         EXIT;
2493 }
2494
2495 static int mdt_object_print(const struct lu_context *ctxt, void *cookie,
2496                             lu_printer_t p, const struct lu_object *o)
2497 {
2498         return (*p)(ctxt, cookie, LUSTRE_MDT0_NAME"-object@%p", o);
2499 }
2500
2501 static struct lu_device_operations mdt_lu_ops = {
2502         .ldo_object_alloc   = mdt_object_alloc,
2503         .ldo_process_config = mdt_process_config
2504 };
2505
2506 static struct lu_object_operations mdt_obj_ops = {
2507         .loo_object_init    = mdt_object_init,
2508         .loo_object_free    = mdt_object_free,
2509         .loo_object_print   = mdt_object_print
2510 };
2511
2512 /* mds_connect_internal */
2513 static int mdt_connect_internal(const struct lu_context *ctx,
2514                                 struct mdt_device *mdt,
2515                                 struct obd_export *exp,
2516                                 struct obd_uuid *cluuid,
2517                                 struct obd_connect_data *data)
2518 {
2519         struct mdt_export_data *med = &exp->exp_mdt_data;
2520         struct mdt_client_data *mcd;
2521         int rc;
2522
2523         if (data != NULL) {
2524                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
2525                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
2526
2527                 /* If no known bits (which should not happen, probably,
2528                    as everybody should support LOOKUP and UPDATE bits at least)
2529                    revert to compat mode with plain locks. */
2530                 if (!data->ocd_ibits_known &&
2531                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
2532                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
2533
2534                 if (!mdt->mdt_opts.mo_acl)
2535                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
2536
2537                 if (!mdt->mdt_opts.mo_user_xattr)
2538                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
2539
2540                 exp->exp_connect_flags = data->ocd_connect_flags;
2541                 data->ocd_version = LUSTRE_VERSION_CODE;
2542                 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
2543         }
2544
2545         if (mdt->mdt_opts.mo_acl &&
2546             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
2547                 CWARN("%s: MDS requires ACL support but client does not\n",
2548                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
2549                 return -EBADE;
2550         }
2551
2552         OBD_ALLOC_PTR(mcd);
2553         if (mcd != NULL) {
2554                 memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
2555                 med->med_mcd = mcd;
2556                 rc = mdt_client_add(ctx, mdt, med, -1);
2557                 if (rc != 0)
2558                         OBD_FREE_PTR(mcd);
2559         } else
2560                 rc = -ENOMEM;
2561
2562         return rc;
2563 }
2564
2565 /* mds_connect copy */
2566 static int mdt_obd_connect(const struct lu_context *ctx,
2567                            struct lustre_handle *conn, struct obd_device *obd,
2568                            struct obd_uuid *cluuid,
2569                            struct obd_connect_data *data)
2570 {
2571         struct obd_export      *exp;
2572         struct mdt_device      *mdt;
2573         int                     rc;
2574         ENTRY;
2575
2576         LASSERT(ctx != NULL);
2577         if (!conn || !obd || !cluuid)
2578                 RETURN(-EINVAL);
2579
2580         mdt = mdt_dev(obd->obd_lu_dev);
2581
2582         rc = class_connect(conn, obd, cluuid);
2583         if (rc)
2584                 RETURN(rc);
2585
2586         exp = class_conn2export(conn);
2587         LASSERT(exp != NULL);
2588
2589         rc = mdt_connect_internal(ctx, mdt, exp, cluuid, data);
2590         if (rc != 0)
2591                 class_disconnect(exp);
2592         else
2593                 class_export_put(exp);
2594
2595         RETURN(rc);
2596 }
2597
2598 static int mdt_obd_disconnect(struct obd_export *exp)
2599 {
2600         int rc;
2601         ENTRY;
2602
2603         LASSERT(exp);
2604         class_export_get(exp);
2605
2606         /* Disconnect early so that clients can't keep using export */
2607         rc = class_disconnect(exp);
2608         //ldlm_cancel_locks_for_export(exp);
2609
2610         /* complete all outstanding replies */
2611         spin_lock(&exp->exp_lock);
2612         while (!list_empty(&exp->exp_outstanding_replies)) {
2613                 struct ptlrpc_reply_state *rs =
2614                         list_entry(exp->exp_outstanding_replies.next,
2615                                    struct ptlrpc_reply_state, rs_exp_list);
2616                 struct ptlrpc_service *svc = rs->rs_service;
2617
2618                 spin_lock(&svc->srv_lock);
2619                 list_del_init(&rs->rs_exp_list);
2620                 ptlrpc_schedule_difficult_reply(rs);
2621                 spin_unlock(&svc->srv_lock);
2622         }
2623         spin_unlock(&exp->exp_lock);
2624
2625         class_export_put(exp);
2626         RETURN(rc);
2627 }
2628
2629 /* FIXME: Can we avoid using these two interfaces? */
2630 static int mdt_init_export(struct obd_export *exp)
2631 {
2632         struct mdt_export_data *med = &exp->exp_mdt_data;
2633         ENTRY;
2634
2635         INIT_LIST_HEAD(&med->med_open_head);
2636         spin_lock_init(&med->med_open_lock);
2637         exp->exp_connecting = 1;
2638         RETURN(0);
2639 }
2640
2641 static int mdt_destroy_export(struct obd_export *export)
2642 {
2643         struct mdt_export_data *med;
2644         struct obd_device *obd = export->exp_obd;
2645         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
2646         struct mdt_thread_info *info;
2647         struct lu_context ctxt;
2648         int rc = 0;
2649         ENTRY;
2650
2651         med = &export->exp_mdt_data;
2652         target_destroy_export(export);
2653
2654         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
2655                 RETURN(0);
2656
2657         rc = lu_context_init(&ctxt, LCT_MD_THREAD);
2658         if (rc)
2659                 RETURN(rc);
2660
2661         lu_context_enter(&ctxt);
2662
2663         info = lu_context_key_get(&ctxt, &mdt_thread_key);
2664         LASSERT(info != NULL);
2665         memset(info, 0, sizeof *info);
2666         /* Close any open files (which may also cause orphan unlinking). */
2667         spin_lock(&med->med_open_lock);
2668         while (!list_empty(&med->med_open_head)) {
2669                 struct list_head *tmp = med->med_open_head.next;
2670                 struct mdt_file_data *mfd =
2671                         list_entry(tmp, struct mdt_file_data, mfd_list);
2672                 struct mdt_object *o = mfd->mfd_object;
2673
2674                 /* Remove mfd handle so it can't be found again.
2675                  * We are consuming the mfd_list reference here. */
2676                 class_handle_unhash(&mfd->mfd_handle);
2677                 list_del_init(&mfd->mfd_list);
2678                 spin_unlock(&med->med_open_lock);
2679                 mdt_mfd_close(&ctxt, mdt, mfd, &info->mti_attr);
2680                 /* TODO: if we close the unlinked file,
2681                  * we need to remove it's objects from OST */
2682                 mdt_object_put(&ctxt, o);
2683                 spin_lock(&med->med_open_lock);
2684         }
2685         spin_unlock(&med->med_open_lock);
2686         mdt_client_free(&ctxt, mdt, med);
2687
2688         lu_context_exit(&ctxt);
2689         lu_context_fini(&ctxt);
2690
2691         RETURN(rc);
2692 }
2693
2694 static int mdt_upcall(const struct lu_context *ctx, struct md_device *md,
2695                       enum md_upcall_event ev)
2696 {
2697         struct mdt_device *m = mdt_dev(&md->md_lu_dev);
2698         struct md_device  *next  = m->mdt_child;
2699         int rc = 0;
2700         ENTRY;
2701
2702         switch (ev) {
2703                 case MD_LOV_SYNC:
2704                         rc = next->md_ops->mdo_get_maxsize(ctx, next,
2705                                         &m->mdt_max_mdsize, 
2706                                         &m->mdt_max_cookiesize);
2707                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
2708                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
2709                         break;
2710                 default:
2711                         CERROR("invalid event\n");
2712                         rc = -EINVAL;
2713                         break;
2714         }
2715         RETURN(rc);
2716 }
2717
2718
2719 static struct obd_ops mdt_obd_device_ops = {
2720         .o_owner          = THIS_MODULE,
2721         .o_connect        = mdt_obd_connect,
2722         .o_disconnect     = mdt_obd_disconnect,
2723         .o_init_export    = mdt_init_export,
2724         .o_destroy_export = mdt_destroy_export,
2725 };
2726
2727 static struct lu_device* mdt_device_fini(const struct lu_context *ctx, 
2728                                          struct lu_device *d)
2729 {
2730         struct mdt_device *m = mdt_dev(d);
2731
2732         mdt_fini(ctx, m);
2733         RETURN(NULL);
2734 }
2735
2736 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
2737 {
2738         struct mdt_device *m = mdt_dev(d);
2739
2740         OBD_FREE_PTR(m);
2741 }
2742
2743 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
2744                                           struct lu_device_type *t,
2745                                           struct lustre_cfg *cfg)
2746 {
2747         struct lu_device  *l;
2748         struct mdt_device *m;
2749
2750         OBD_ALLOC_PTR(m);
2751         if (m != NULL) {
2752                 int result;
2753
2754                 l = &m->mdt_md_dev.md_lu_dev;
2755                 result = mdt_init0(ctx, m, t, cfg);
2756                 if (result != 0) {
2757                         OBD_FREE_PTR(m);
2758                         l = ERR_PTR(result);
2759                 }
2760                 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
2761         } else
2762                 l = ERR_PTR(-ENOMEM);
2763         return l;
2764 }
2765
2766 /*
2767  * context key constructor/destructor
2768  */
2769 static void *mdt_thread_init(const struct lu_context *ctx,
2770                              struct lu_context_key *key)
2771 {
2772         struct mdt_thread_info *info;
2773
2774         /*
2775          * check that no high order allocations are incurred.
2776          */
2777         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
2778         OBD_ALLOC_PTR(info);
2779         if (info == NULL)
2780                 info = ERR_PTR(-ENOMEM);
2781         return info;
2782 }
2783
2784 static void mdt_thread_fini(const struct lu_context *ctx,
2785                             struct lu_context_key *key, void *data)
2786 {
2787         struct mdt_thread_info *info = data;
2788         OBD_FREE_PTR(info);
2789 }
2790
2791 struct lu_context_key mdt_thread_key = {
2792         .lct_tags = LCT_MD_THREAD,
2793         .lct_init = mdt_thread_init,
2794         .lct_fini = mdt_thread_fini
2795 };
2796
2797 static void *mdt_txn_init(const struct lu_context *ctx,
2798                              struct lu_context_key *key)
2799 {
2800         struct mdt_txn_info *txi;
2801
2802         /*
2803          * check that no high order allocations are incurred.
2804          */
2805         CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
2806         OBD_ALLOC_PTR(txi);
2807         if (txi == NULL)
2808                 txi = ERR_PTR(-ENOMEM);
2809         return txi;
2810 }
2811
2812 static void mdt_txn_fini(const struct lu_context *ctx,
2813                             struct lu_context_key *key, void *data)
2814 {
2815         struct mdt_txn_info *txi = data;
2816         OBD_FREE_PTR(txi);
2817 }
2818
2819 struct lu_context_key mdt_txn_key = {
2820         .lct_tags = LCT_TX_HANDLE,
2821         .lct_init = mdt_txn_init,
2822         .lct_fini = mdt_txn_fini
2823 };
2824
2825
2826 static int mdt_type_init(struct lu_device_type *t)
2827 {
2828         int rc;
2829
2830         rc = lu_context_key_register(&mdt_thread_key);
2831         if (rc == 0)
2832                 rc = lu_context_key_register(&mdt_txn_key);
2833         return rc;
2834 }
2835
2836 static void mdt_type_fini(struct lu_device_type *t)
2837 {
2838         lu_context_key_degister(&mdt_thread_key);
2839         lu_context_key_degister(&mdt_txn_key);
2840 }
2841
2842 static struct lu_device_type_operations mdt_device_type_ops = {
2843         .ldto_init = mdt_type_init,
2844         .ldto_fini = mdt_type_fini,
2845
2846         .ldto_device_alloc = mdt_device_alloc,
2847         .ldto_device_free  = mdt_device_free,
2848         .ldto_device_fini  = mdt_device_fini
2849 };
2850
2851 static struct lu_device_type mdt_device_type = {
2852         .ldt_tags     = LU_DEVICE_MD,
2853         .ldt_name     = LUSTRE_MDT0_NAME,
2854         .ldt_ops      = &mdt_device_type_ops,
2855         .ldt_ctx_tags = LCT_MD_THREAD
2856 };
2857
2858 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
2859         { 0 }
2860 };
2861
2862 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
2863         { 0 }
2864 };
2865
2866 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
2867
2868 static int __init mdt_mod_init(void)
2869 {
2870         int result;
2871         struct lprocfs_static_vars lvars;
2872
2873         mdt_num_threads = MDT_NUM_THREADS;
2874         lprocfs_init_vars(mdt, &lvars);
2875         result = class_register_type(&mdt_obd_device_ops, NULL,
2876                                      lvars.module_vars, LUSTRE_MDT0_NAME,
2877                                      &mdt_device_type);
2878         return result;
2879 }
2880
2881 static void __exit mdt_mod_exit(void)
2882 {
2883         class_unregister_type(LUSTRE_MDT0_NAME);
2884 }
2885
2886
2887 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
2888 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
2889         .mh_name    = #opc,                                             \
2890         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
2891         .mh_opc     = prefix ## _  ## opc,                              \
2892         .mh_flags   = flags,                                            \
2893         .mh_act     = fn,                                               \
2894         .mh_fmt     = fmt                                               \
2895 }
2896
2897 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
2898         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
2899 /*
2900  * Request with a format known in advance
2901  */
2902 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
2903         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
2904 /*
2905  * Request with a format we do not yet know
2906  */
2907 #define DEF_MDT_HNDL_0(flags, name, fn)                                 \
2908         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
2909
2910 static struct mdt_handler mdt_mds_ops[] = {
2911 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
2912 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
2913 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
2914 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR,      mdt_getattr),
2915 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
2916 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO|MUTABOR,
2917                                           SETXATTR,     mdt_setxattr),
2918 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
2919 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
2920 DEF_MDT_HNDL_F(0                        |MUTABOR,
2921                                           REINT,        mdt_reint),
2922 DEF_MDT_HNDL_F(HABEO_CORPUS             , CLOSE,        mdt_close),
2923 DEF_MDT_HNDL_0(0,                         DONE_WRITING, mdt_done_writing),
2924 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
2925 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
2926 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_quotacheck_handle),
2927 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle)
2928 };
2929
2930 #define DEF_OBD_HNDL(flags, name, fn)                   \
2931         DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
2932
2933
2934 static struct mdt_handler mdt_obd_ops[] = {
2935         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
2936         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
2937         DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
2938 };
2939
2940 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
2941         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
2942 #define DEF_DLM_HNDL_F(flags, name, fn)                   \
2943         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
2944
2945 static struct mdt_handler mdt_dlm_ops[] = {
2946         DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
2947         DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
2948         DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
2949         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
2950 };
2951
2952 static struct mdt_handler mdt_llog_ops[] = {
2953 };
2954
2955 static struct mdt_opc_slice mdt_regular_handlers[] = {
2956         {
2957                 .mos_opc_start = MDS_GETATTR,
2958                 .mos_opc_end   = MDS_LAST_OPC,
2959                 .mos_hs        = mdt_mds_ops
2960         },
2961         {
2962                 .mos_opc_start = OBD_PING,
2963                 .mos_opc_end   = OBD_LAST_OPC,
2964                 .mos_hs        = mdt_obd_ops
2965         },
2966         {
2967                 .mos_opc_start = LDLM_ENQUEUE,
2968                 .mos_opc_end   = LDLM_LAST_OPC,
2969                 .mos_hs        = mdt_dlm_ops
2970         },
2971         {
2972                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
2973                 .mos_opc_end   = LLOG_LAST_OPC,
2974                 .mos_hs        = mdt_llog_ops
2975         },
2976         {
2977                 .mos_hs        = NULL
2978         }
2979 };
2980
2981 static struct mdt_handler mdt_readpage_ops[] = {
2982         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
2983
2984         /*
2985          * XXX: this is ugly and should be fixed one day, see mdc_close() for
2986          * detailed comment. --umka
2987          */
2988         DEF_MDT_HNDL_F(HABEO_CORPUS,              CLOSE,    mdt_close),
2989 };
2990
2991 static struct mdt_opc_slice mdt_readpage_handlers[] = {
2992         {
2993                 .mos_opc_start = MDS_GETATTR,
2994                 .mos_opc_end   = MDS_LAST_OPC,
2995                 .mos_hs        = mdt_readpage_ops
2996         },
2997         {
2998                 .mos_hs        = NULL
2999         }
3000 };
3001
3002 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3003 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT0_NAME")");
3004 MODULE_LICENSE("GPL");
3005
3006 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
3007                 "number of mdt service threads to start");
3008
3009 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);