Whamcloud - gitweb
6a4d131b9042b5c8d17fd2e49dc35567be753fa2
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mdt/mdt_handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *   Author: Huang Hua <huanghua@clusterfs.com>
14  *
15  *   This file is part of the Lustre file system, http://www.lustre.org
16  *   Lustre is a trademark of Cluster File Systems, Inc.
17  *
18  *   You may have signed or agreed to another license before downloading
19  *   this software.  If so, you are bound by the terms and conditions
20  *   of that agreement, and the following does not apply to you.  See the
21  *   LICENSE file included with this distribution for more information.
22  *
23  *   If you did not agree to a different license, then this copy of Lustre
24  *   is open source software; you can redistribute it and/or modify it
25  *   under the terms of version 2 of the GNU General Public License as
26  *   published by the Free Software Foundation.
27  *
28  *   In either case, Lustre is distributed in the hope that it will be
29  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31  *   license text for more details.
32  */
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37 #define DEBUG_SUBSYSTEM S_MDS
38
39 #include <linux/module.h>
40
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
43 /*
44  * struct OBD_{ALLOC,FREE}*()
45  * MDT_FAIL_CHECK
46  */
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
53 #include <obd.h>
54 /* lu2dt_dev() */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include <lustre_mdt.h>
58 #include "mdt_internal.h"
59 #include <linux/lustre_acl.h>
60 #include <lustre_param.h>
61 /*
62  * Initialized in mdt_mod_init().
63  */
64 unsigned long mdt_num_threads;
65
66 /* ptlrpc request handler for MDT. All handlers are
67  * grouped into several slices - struct mdt_opc_slice,
68  * and stored in an array - mdt_handlers[].
69  */
70 struct mdt_handler {
71         /* The name of this handler. */
72         const char *mh_name;
73         /* Fail id for this handler, checked at the beginning of this handler*/
74         int         mh_fail_id;
75         /* Operation code for this handler */
76         __u32       mh_opc;
77         /* flags are listed in enum mdt_handler_flags below. */
78         __u32       mh_flags;
79         /* The actual handler function to execute. */
80         int (*mh_act)(struct mdt_thread_info *info);
81         /* Request format for this request. */
82         const struct req_format *mh_fmt;
83 };
84
85 enum mdt_handler_flags {
86         /*
87          * struct mdt_body is passed in the incoming message, and object
88          * identified by this fid exists on disk.
89          *
90          * "habeo corpus" == "I have a body"
91          */
92         HABEO_CORPUS = (1 << 0),
93         /*
94          * struct ldlm_request is passed in the incoming message.
95          *
96          * "habeo clavis" == "I have a key"
97          */
98         HABEO_CLAVIS = (1 << 1),
99         /*
100          * this request has fixed reply format, so that reply message can be
101          * packed by generic code.
102          *
103          * "habeo refero" == "I have a reply"
104          */
105         HABEO_REFERO = (1 << 2),
106         /*
107          * this request will modify something, so check whether the filesystem
108          * is readonly or not, then return -EROFS to client asap if necessary.
109          *
110          * "mutabor" == "I shall modify"
111          */
112         MUTABOR      = (1 << 3)
113 };
114
115 struct mdt_opc_slice {
116         __u32               mos_opc_start;
117         int                 mos_opc_end;
118         struct mdt_handler *mos_hs;
119 };
120
121 static struct mdt_opc_slice mdt_regular_handlers[];
122 static struct mdt_opc_slice mdt_readpage_handlers[];
123 static struct mdt_opc_slice mdt_seq_handlers[];
124 static struct mdt_opc_slice mdt_fld_handlers[];
125
126 static struct mdt_device *mdt_dev(struct lu_device *d);
127 static int mdt_regular_handle(struct ptlrpc_request *req);
128 static int mdt_recovery_handle(struct ptlrpc_request *req);
129 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
130
131 static struct lu_object_operations mdt_obj_ops;
132
133 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
134 {
135         if (!rep)
136                 return 0;
137         return (rep->lock_policy_res1 & flag);
138 }
139
140 void mdt_clear_disposition(struct mdt_thread_info *info,
141                            struct ldlm_reply *rep, int flag)
142 {
143         if (info)
144                 info->mti_opdata &= ~flag;
145         if (rep)
146                 rep->lock_policy_res1 &= ~flag;
147 }
148
149 void mdt_set_disposition(struct mdt_thread_info *info,
150                          struct ldlm_reply *rep, int flag)
151 {
152         if (info)
153                 info->mti_opdata |= flag;
154         if (rep)
155                 rep->lock_policy_res1 |= flag;
156 }
157
158 static int mdt_getstatus(struct mdt_thread_info *info)
159 {
160         struct md_device *next  = info->mti_mdt->mdt_child;
161         int               rc;
162         struct mdt_body  *body;
163
164         ENTRY;
165
166         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
167                 rc = -ENOMEM;
168         } else {
169                 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
170                 rc = next->md_ops->mdo_root_get(info->mti_env, next,
171                                                 &body->fid1);
172                 if (rc == 0)
173                         body->valid |= OBD_MD_FLID;
174         }
175
176         RETURN(rc);
177 }
178
179 static int mdt_statfs(struct mdt_thread_info *info)
180 {
181         struct md_device  *next  = info->mti_mdt->mdt_child;
182         struct obd_statfs *osfs;
183         int                rc;
184
185         ENTRY;
186
187         /* This will trigger a watchdog timeout */
188         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
189                          (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
190
191
192         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
193                 rc = -ENOMEM;
194         } else {
195                 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
196                 /* XXX max_age optimisation is needed here. See mds_statfs */
197                 rc = next->md_ops->mdo_statfs(info->mti_env, next,
198                                               &info->mti_u.ksfs);
199                 statfs_pack(osfs, &info->mti_u.ksfs);
200         }
201
202         RETURN(rc);
203 }
204
205 void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr,
206                         struct mdt_object *o)
207 {
208         /* Check if Size-on-MDS is enabled. */
209         if (S_ISREG(attr->la_mode) && mdt_sizeonmds_enabled(o)) {
210                 b->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
211                 b->size = attr->la_size;
212                 b->blocks = attr->la_blocks;
213         }
214 }
215
216 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
217                         const struct lu_fid *fid)
218 {
219         /*XXX should pack the reply body according to lu_valid*/
220         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID   |
221                     OBD_MD_FLGID   | OBD_MD_FLTYPE  |
222                     OBD_MD_FLMODE  | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
223                     OBD_MD_FLATIME | OBD_MD_FLMTIME ;
224
225         if (!S_ISREG(attr->la_mode))
226                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
227
228         b->atime      = attr->la_atime;
229         b->mtime      = attr->la_mtime;
230         b->ctime      = attr->la_ctime;
231         b->mode       = attr->la_mode;
232         b->size       = attr->la_size;
233         b->blocks     = attr->la_blocks;
234         b->uid        = attr->la_uid;
235         b->gid        = attr->la_gid;
236         b->flags      = attr->la_flags;
237         b->nlink      = attr->la_nlink;
238         b->rdev       = attr->la_rdev;
239
240         if (fid) {
241                 b->fid1 = *fid;
242                 b->valid |= OBD_MD_FLID;
243                 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
244                                 PFID(fid), b->nlink, b->mode, b->size);
245         }
246 }
247
248 static inline int mdt_body_has_lov(const struct lu_attr *la,
249                                    const struct mdt_body *body)
250 {
251         return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
252                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
253 }
254
255 static int mdt_getattr_internal(struct mdt_thread_info *info,
256                                 struct mdt_object *o)
257 {
258         struct md_object        *next = mdt_object_child(o);
259         struct mdt_device       *mdt = info->mti_mdt;
260         const struct mdt_body   *reqbody = info->mti_body;
261         struct ptlrpc_request   *req = mdt_info_req(info);
262         struct md_attr          *ma = &info->mti_attr;
263         struct lu_attr          *la = &ma->ma_attr;
264         struct req_capsule      *pill = &info->mti_pill;
265         const struct lu_env     *env = info->mti_env;
266         struct mdt_body         *repbody;
267         struct lu_buf           *buffer = &info->mti_buf;
268         int                     rc;
269         ENTRY;
270
271         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
272                 RETURN(-ENOMEM);
273
274         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
275         repbody->eadatasize = 0;
276         repbody->aclsize = 0;
277
278         if (reqbody->valid & OBD_MD_MEA) {
279                 /* Assumption: MDT_MD size is enough for lmv size FIXME */
280                 ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
281                 ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD,
282                                                              RCL_SERVER);
283                 ma->ma_need = MA_INODE | MA_LMV;
284         } else {
285                 ma->ma_need = MA_INODE | MA_LOV ;
286                 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
287                 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
288                                                              RCL_SERVER);
289         }
290         rc = mo_attr_get(env, next, ma);
291         if (rc == -EREMOTE) {
292                 /* This object is located on remote node.*/
293                 repbody->fid1 = *mdt_object_fid(o);
294                 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
295                 RETURN(0);
296         } else if (rc) {
297                 CERROR("getattr error for "DFID": %d\n",
298                         PFID(mdt_object_fid(o)), rc);
299                 RETURN(rc);
300         }
301
302         if (ma->ma_valid & MA_INODE) {
303                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
304                 mdt_body_reverse_idmap(info, repbody);
305         } else {
306                 RETURN(-EFAULT);
307         }
308
309         if (mdt_body_has_lov(la, reqbody)) {
310                 if (ma->ma_valid & MA_LOV) {
311                         LASSERT(ma->ma_lmm_size);
312                         mdt_dump_lmm(D_INFO, ma->ma_lmm);
313                         repbody->eadatasize = ma->ma_lmm_size;
314                         if (S_ISDIR(la->la_mode))
315                                 repbody->valid |= OBD_MD_FLDIREA;
316                         else
317                                 repbody->valid |= OBD_MD_FLEASIZE;
318                 }
319                 if (ma->ma_valid & MA_LMV) {
320                         LASSERT(S_ISDIR(la->la_mode));
321                         repbody->eadatasize = ma->ma_lmv_size;
322                         repbody->valid |= OBD_MD_FLDIREA;
323                         repbody->valid |= OBD_MD_MEA;
324                 }
325         } else if (S_ISLNK(la->la_mode) &&
326                           reqbody->valid & OBD_MD_LINKNAME) {
327                 buffer->lb_buf = ma->ma_lmm;
328                 buffer->lb_len = ma->ma_lmm_size;
329                 rc = mo_readlink(env, next, buffer);
330                 if (rc <= 0) {
331                         CERROR("readlink failed: %d\n", rc);
332                         rc = -EFAULT;
333                 } else {
334                         repbody->valid |= OBD_MD_LINKNAME;
335                         repbody->eadatasize = rc + 1;
336                         ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
337                         CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
338                                         (char*)ma->ma_lmm, rc);
339                         rc = 0;
340                 }
341         }
342
343         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
344                 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
345                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
346                 repbody->valid |= OBD_MD_FLMODEASIZE;
347                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
348                                 "MAX_COOKIE to : %d:%d\n",
349                                 repbody->max_mdsize,
350                                 repbody->max_cookiesize);
351         }
352
353         if (reqbody->valid & OBD_MD_FLRMTPERM) {
354                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
355                 /* mdt_getattr_lock only */
356                 rc = mdt_pack_remote_perm(info, o, buffer);
357                 if (rc)
358                         RETURN(rc);
359                 repbody->valid |= OBD_MD_FLRMTPERM;
360                 repbody->aclsize = sizeof(struct mdt_remote_perm);
361         }
362 #ifdef CONFIG_FS_POSIX_ACL
363         else if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
364                  (reqbody->valid & OBD_MD_FLACL)) {
365                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
366                 buffer->lb_len = req_capsule_get_size(pill,
367                                                       &RMF_ACL, RCL_SERVER);
368                 if (buffer->lb_len > 0) {
369                         rc = mo_xattr_get(env, next, buffer,
370                                           XATTR_NAME_ACL_ACCESS);
371                         if (rc < 0) {
372                                 if (rc == -ENODATA || rc == -EOPNOTSUPP)
373                                         rc = 0;
374                                 else
375                                         CERROR("got acl size: %d\n", rc);
376                         } else {
377                                 repbody->aclsize = rc;
378                                 repbody->valid |= OBD_MD_FLACL;
379                         }
380                 }
381         }
382 #endif
383
384         if ((reqbody->valid & OBD_MD_FLMDSCAPA) && mdt->mdt_opts.mo_mds_capa) {
385                 struct lustre_capa *capa;
386
387                 spin_lock(&capa_lock);
388                 info->mti_capa_key = *red_capa_key(mdt);
389                 spin_unlock(&capa_lock);
390
391                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
392                 LASSERT(capa);
393                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
394                 rc = mo_capa_get(env, next, capa);
395                 if (rc)
396                         RETURN(rc);
397                 repbody->valid |= OBD_MD_FLMDSCAPA;
398         }
399
400         RETURN(rc);
401 }
402
403 static int mdt_getattr(struct mdt_thread_info *info)
404 {
405         int rc;
406         struct mdt_object *obj;
407         struct mdt_body *reqbody;
408
409         obj = info->mti_object;
410         LASSERT(obj != NULL);
411         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
412         ENTRY;
413
414         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
415         if (reqbody == NULL)
416                 RETURN(-EFAULT);
417
418         if (reqbody->valid & OBD_MD_FLRMTPERM) {
419                 rc = mdt_init_ucred(info, reqbody);
420                 if (rc)
421                         RETURN(rc);
422         }
423
424         rc = mdt_getattr_internal(info, obj);
425         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
426         if (reqbody->valid & OBD_MD_FLRMTPERM)
427                 mdt_exit_ucred(info);
428         RETURN(rc);
429 }
430
431 static int mdt_is_subdir(struct mdt_thread_info *info)
432 {
433         struct mdt_object   *obj = info->mti_object;
434         struct req_capsule  *pill = &info->mti_pill;
435         struct mdt_body     *repbody;
436         int                  rc;
437
438         obj = info->mti_object;
439         LASSERT(obj != NULL);
440         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
441         ENTRY;
442
443         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
444
445         /*
446          * We save last checked parent fid to @repbody->fid1 for remote
447          * directory case.
448          */
449         LASSERT(fid_is_sane(&info->mti_body->fid2));
450         rc = mdo_is_subdir(info->mti_env, mdt_object_child(obj),
451                            &info->mti_body->fid2, &repbody->fid1);
452         if (rc < 0)
453                 RETURN(rc);
454
455         /*
456          * Save error code to ->mode. Later it it is used for detecting the case
457          * of remote subdir.
458          */
459         repbody->mode = rc;
460         repbody->valid = OBD_MD_FLMODE;
461
462         if (rc == EREMOTE)
463                 repbody->valid |= OBD_MD_FLID;
464
465
466         RETURN(0);
467 }
468
469 /*
470  * UPDATE lock should be taken against parent, and be release before exit;
471  * child_bits lock should be taken against child, and be returned back:
472  *            (1)normal request should release the child lock;
473  *            (2)intent request will grant the lock to client.
474  */
475 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
476                                  struct mdt_lock_handle *lhc,
477                                  __u64 child_bits,
478                                  struct ldlm_reply *ldlm_rep)
479 {
480         struct ptlrpc_request *req = mdt_info_req(info);
481         struct mdt_object     *parent = info->mti_object;
482         struct mdt_object     *child;
483         struct md_object      *next = mdt_object_child(info->mti_object);
484         struct lu_fid         *child_fid = &info->mti_tmp_fid1;
485         int                    is_resent, rc;
486         const char            *name;
487         struct mdt_lock_handle *lhp;
488         struct ldlm_lock      *lock;
489         ENTRY;
490
491         is_resent = lustre_handle_is_used(&lhc->mlh_lh);
492         if (is_resent)
493                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
494
495         LASSERT(info->mti_object != NULL);
496         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
497         if (name == NULL)
498                 RETURN(-EFAULT);
499
500         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
501                         PFID(mdt_object_fid(parent)), name, ldlm_rep);
502
503         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
504         if (strlen(name) == 0) {
505                 /* only getattr on the child. parent is on another node. */
506                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
507                 child = parent;
508                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
509                        ", ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
510
511                 if (is_resent) {
512                         /* Do not take lock for resent case. */
513                         lock = ldlm_handle2lock(&lhc->mlh_lh);
514                         if (!lock) {
515                                 CERROR("Invalid lock handle "LPX64"\n",
516                                        lhc->mlh_lh.cookie);
517                                 LBUG();
518                         }
519                         LASSERT(fid_res_name_eq(mdt_object_fid(child),
520                                                 &lock->l_resource->lr_name));
521                         LDLM_LOCK_PUT(lock);
522                         rc = 0;
523                 } else {
524                         mdt_lock_handle_init(lhc);
525                         lhc->mlh_mode = LCK_CR;
526
527                         /*
528                          * Object's name is on another MDS, no lookup lock is
529                          * needed here but update is.
530                          */
531                         child_bits &= ~MDS_INODELOCK_LOOKUP;
532                         child_bits |= MDS_INODELOCK_UPDATE;
533                         rc = mdt_object_lock(info, child, lhc, child_bits);
534                 }
535                 if (rc == 0) {
536                         /* Finally, we can get attr for child. */
537                         rc = mdt_getattr_internal(info, child);
538                         if (rc != 0)
539                                 mdt_object_unlock(info, child, lhc, 1);
540                 }
541                 GOTO(out, rc);
542         }
543
544         /*step 1: lock parent */
545         lhp = &info->mti_lh[MDT_LH_PARENT];
546         lhp->mlh_mode = LCK_CR;
547         rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
548         if (rc != 0)
549                 RETURN(rc);
550
551         /*step 2: lookup child's fid by name */
552         rc = mdo_lookup(info->mti_env, next, name, child_fid);
553         if (rc != 0) {
554                 if (rc == -ENOENT)
555                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
556                 GOTO(out_parent, rc);
557         } else
558                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
559         /*
560          *step 3: find the child object by fid & lock it.
561          *        regardless if it is local or remote.
562          */
563         child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
564                                 BYPASS_CAPA);
565         if (IS_ERR(child))
566                 GOTO(out_parent, rc = PTR_ERR(child));
567         if (is_resent) {
568                 /* Do not take lock for resent case. */
569                 lock = ldlm_handle2lock(&lhc->mlh_lh);
570                 if (!lock) {
571                         CERROR("Invalid lock handle "LPX64"\n",
572                                lhc->mlh_lh.cookie);
573                         LBUG();
574                 }
575                 LASSERT(fid_res_name_eq(child_fid,
576                                         &lock->l_resource->lr_name));
577                 LDLM_LOCK_PUT(lock);
578         } else {
579                 mdt_lock_handle_init(lhc);
580                 lhc->mlh_mode = LCK_CR;
581                 rc = mdt_object_cr_lock(info, child, lhc, child_bits);
582                 if (rc != 0)
583                         GOTO(out_child, rc);
584         }
585
586         /* finally, we can get attr for child. */
587         rc = mdt_getattr_internal(info, child);
588         if (rc != 0) {
589                 mdt_object_unlock(info, child, lhc, 1);
590         } else {
591                 struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh);
592                 if (lock) {
593                         struct ldlm_res_id *res_id;
594                         struct mdt_body *repbody;
595                         struct lu_attr *ma;
596
597                         /* Debugging code. */
598                         res_id = &lock->l_resource->lr_name;
599                         LDLM_DEBUG(lock, "we will return this lock client\n");
600                         LASSERTF(fid_res_name_eq(mdt_object_fid(child),
601                                                  &lock->l_resource->lr_name),
602                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
603                                 (unsigned long)res_id->name[0],
604                                 (unsigned long)res_id->name[1],
605                                 (unsigned long)res_id->name[2],
606                                 PFID(mdt_object_fid(child)));
607
608                         /* Pack Size-on-MDS inode attributes to the body if
609                          * update lock is given. */
610                         repbody = req_capsule_server_get(&info->mti_pill,
611                                                          &RMF_MDT_BODY);
612                         ma = &info->mti_attr.ma_attr;
613                         if (lock->l_policy_data.l_inodebits.bits &
614                             MDS_INODELOCK_UPDATE)
615                                 mdt_pack_size2body(repbody, ma, child);
616                         LDLM_LOCK_PUT(lock);
617                 }
618
619
620         }
621         EXIT;
622 out_child:
623         mdt_object_put(info->mti_env, child);
624 out_parent:
625         mdt_object_unlock(info, parent, lhp, 1);
626 out:
627         return rc;
628 }
629
630 /* normal handler: should release the child lock */
631 static int mdt_getattr_name(struct mdt_thread_info *info)
632 {
633         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
634         struct mdt_body *reqbody;
635         int rc;
636
637         ENTRY;
638
639         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
640         if (reqbody == NULL)
641                 RETURN(-EFAULT);
642
643         rc = mdt_init_ucred(info, reqbody);
644         if (rc)
645                 RETURN(rc);
646
647         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
648         if (lustre_handle_is_used(&lhc->mlh_lh)) {
649                 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
650                 lhc->mlh_lh.cookie = 0;
651         }
652         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
653         mdt_exit_ucred(info);
654         RETURN(rc);
655 }
656
657 static struct lu_device_operations mdt_lu_ops;
658
659 static int lu_device_is_mdt(struct lu_device *d)
660 {
661         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
662 }
663
664 static inline struct mdt_device *mdt_dev(struct lu_device *d)
665 {
666         LASSERT(lu_device_is_mdt(d));
667         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
668 }
669
670 static int mdt_connect(struct mdt_thread_info *info)
671 {
672         int rc;
673         struct ptlrpc_request *req;
674
675         req = mdt_info_req(info);
676         rc = target_handle_connect(req, mdt_recovery_handle);
677         if (rc == 0) {
678                 LASSERT(req->rq_export != NULL);
679                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
680                 rc = mdt_init_idmap(info);
681         }
682         return rc;
683 }
684
685 static int mdt_disconnect(struct mdt_thread_info *info)
686 {
687         return target_handle_disconnect(mdt_info_req(info));
688 }
689
690 static int mdt_sendpage(struct mdt_thread_info *info,
691                         struct lu_rdpg *rdpg)
692 {
693         struct ptlrpc_request   *req = mdt_info_req(info);
694         struct ptlrpc_bulk_desc *desc;
695         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
696         int                      tmpcount;
697         int                      tmpsize;
698         int                      i;
699         int                      rc;
700         ENTRY;
701
702         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
703                                     MDS_BULK_PORTAL);
704         if (desc == NULL)
705                 GOTO(out, rc = -ENOMEM);
706
707         for (i = 0, tmpcount = rdpg->rp_count;
708                 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
709                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
710                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
711         }
712
713         LASSERT(desc->bd_nob == rdpg->rp_count);
714         rc = ptlrpc_start_bulk_transfer(desc);
715         if (rc)
716                 GOTO(free_desc, rc);
717
718         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
719                 GOTO(abort_bulk, rc);
720
721         *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
722         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
723         LASSERT (rc == 0 || rc == -ETIMEDOUT);
724
725         if (rc == 0) {
726                 if (desc->bd_success &&
727                     desc->bd_nob_transferred == rdpg->rp_count)
728                         GOTO(free_desc, rc);
729
730                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
731         }
732
733         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
734                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
735                   desc->bd_nob_transferred, rdpg->rp_count,
736                   req->rq_export->exp_client_uuid.uuid,
737                   req->rq_export->exp_connection->c_remote_uuid.uuid);
738
739         class_fail_export(req->rq_export);
740
741         EXIT;
742 abort_bulk:
743         ptlrpc_abort_bulk(desc);
744 free_desc:
745         ptlrpc_free_bulk(desc);
746 out:
747         return rc;
748 }
749
750 #ifdef HAVE_SPLIT_SUPPORT
751 /*
752  * Retrieve dir entry from the page and insert it to the
753  * slave object, actually, this should be in osd layer,
754  * but since it will not in the final product, so just do
755  * it here and do not define more moo api anymore for
756  * this.
757  */
758 static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
759 {
760         struct mdt_object *object = info->mti_object;
761         struct lu_dirpage *dp;
762         struct lu_dirent *ent;
763         int rc = 0;
764
765         ENTRY;
766
767         /* Disable trans for this name insert, since it will
768          * include many trans for this */
769         info->mti_no_need_trans = 1;
770         kmap(page);
771         dp = page_address(page);
772         for (ent = lu_dirent_start(dp); ent != NULL;
773                           ent = lu_dirent_next(ent)) {
774                 struct lu_fid *lf = &ent->lde_fid;
775
776                 /* FIXME: multi-trans for this name insert */
777                 if (strncmp(ent->lde_name, ".", ent->lde_namelen) &&
778                     strncmp(ent->lde_name, "..", ent->lde_namelen)) {
779                         char *name;
780                         /* FIXME: Here we allocate name for each name,
781                          * maybe stupid, but can not find better way.
782                          * will find better way */
783                         OBD_ALLOC(name, ent->lde_namelen + 1);
784                         memcpy(name, ent->lde_name, ent->lde_namelen);
785                         rc = mdo_name_insert(info->mti_env,
786                                              md_object_next(&object->mot_obj),
787                                              name, lf, 0);
788                         OBD_FREE(name, ent->lde_namelen + 1);
789                         if (rc)
790                                 GOTO(out, rc);
791                 }
792         }
793 out:
794         kunmap(page);
795         RETURN(rc);
796 }
797
798 static int mdt_bulk_timeout(void *data)
799 {
800         ENTRY;
801
802         CERROR("mdt bulk transfer timeout \n");
803
804         RETURN(1);
805 }
806
807 static int mdt_writepage(struct mdt_thread_info *info)
808 {
809         struct ptlrpc_request   *req = mdt_info_req(info);
810         struct mdt_body         *reqbody;
811         struct l_wait_info      *lwi;
812         struct ptlrpc_bulk_desc *desc;
813         struct page             *page;
814         int                rc;
815         ENTRY;
816
817
818         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
819         if (reqbody == NULL)
820                 RETURN(-EFAULT);
821
822         desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
823         if (!desc)
824                 RETURN(-ENOMEM);
825
826         /* allocate the page for the desc */
827         page = alloc_pages(GFP_KERNEL, 0);
828         if (!page)
829                 GOTO(desc_cleanup, rc = -ENOMEM);
830
831         CDEBUG(D_INFO, "Received page offset %d size %d \n",
832                         (int)reqbody->size, (int)reqbody->nlink);
833
834         ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
835                               (int)reqbody->nlink);
836
837         /* FIXME: following parts are copied from ost_brw_write */
838
839         /* Check if client was evicted while we were doing i/o before touching
840            network */
841         OBD_ALLOC_PTR(lwi);
842         if (!lwi)
843                 GOTO(cleanup_page, rc = -ENOMEM);
844
845         if (desc->bd_export->exp_failed)
846                 rc = -ENOTCONN;
847         else
848                 rc = ptlrpc_start_bulk_transfer (desc);
849         if (rc == 0) {
850                 *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
851                                             mdt_bulk_timeout, desc);
852                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
853                                   desc->bd_export->exp_failed, lwi);
854                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
855                 if (rc == -ETIMEDOUT) {
856                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
857                         ptlrpc_abort_bulk(desc);
858                 } else if (desc->bd_export->exp_failed) {
859                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
860                         rc = -ENOTCONN;
861                         ptlrpc_abort_bulk(desc);
862                 } else if (!desc->bd_success ||
863                            desc->bd_nob_transferred != desc->bd_nob) {
864                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
865                                   desc->bd_success ?
866                                   "truncated" : "network error on",
867                                   desc->bd_nob_transferred, desc->bd_nob);
868                         /* XXX should this be a different errno? */
869                         rc = -ETIMEDOUT;
870                 }
871         } else {
872                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
873         }
874         if (rc)
875                 GOTO(cleanup_lwi, rc);
876         rc = mdt_write_dir_page(info, page);
877
878 cleanup_lwi:
879         OBD_FREE_PTR(lwi);
880 cleanup_page:
881         __free_pages(page, 0);
882 desc_cleanup:
883         ptlrpc_free_bulk(desc);
884         RETURN(rc);
885 }
886 #endif
887
888 static int mdt_readpage(struct mdt_thread_info *info)
889 {
890         struct mdt_object *object = info->mti_object;
891         struct lu_rdpg    *rdpg = &info->mti_u.rdpg.mti_rdpg;
892         struct mdt_body   *reqbody;
893         struct mdt_body   *repbody;
894         int                rc, rc1 = 0;
895         int                i;
896         ENTRY;
897
898         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
899                 RETURN(-ENOMEM);
900
901         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
902         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
903         if (reqbody == NULL || repbody == NULL)
904                 RETURN(-EFAULT);
905
906         rc = mdt_init_ucred(info, reqbody);
907         if (rc)
908                 RETURN(rc);
909
910         /*
911          * prepare @rdpg before calling lower layers and transfer itself. Here
912          * reqbody->size contains offset of where to start to read and
913          * reqbody->nlink contains number bytes to read.
914          */
915         rdpg->rp_hash = reqbody->size;
916         if ((__u64)rdpg->rp_hash != reqbody->size) {
917                 CERROR("Invalid hash: %#llx != %#llx\n",
918                        (__u64)rdpg->rp_hash, reqbody->size);
919                 GOTO(out, rc = -EFAULT);
920         }
921         rdpg->rp_count  = reqbody->nlink;
922         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
923         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
924         if (rdpg->rp_pages == NULL)
925                 GOTO(out, rc = -ENOMEM);
926
927         for (i = 0; i < rdpg->rp_npages; ++i) {
928                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
929                 if (rdpg->rp_pages[i] == NULL)
930                         GOTO(free_rdpg, rc = -ENOMEM);
931         }
932
933         /* call lower layers to fill allocated pages with directory data */
934         rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
935         if (rc) {
936                 if (rc == -ERANGE)
937                         rc1 = rc;
938                else
939                         GOTO(free_rdpg, rc);
940         }
941
942         /* send pages to client */
943         rc = mdt_sendpage(info, rdpg);
944
945         EXIT;
946 free_rdpg:
947
948         for (i = 0; i < rdpg->rp_npages; i++)
949                 if (rdpg->rp_pages[i] != NULL)
950                         __free_pages(rdpg->rp_pages[i], 0);
951         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
952
953         mdt_exit_ucred(info);
954         MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
955
956 out:
957         mdt_exit_ucred(info);
958         return rc ? rc : rc1;
959 }
960
961 static int mdt_reint_internal(struct mdt_thread_info *info,
962                               struct mdt_lock_handle *lhc,
963                               __u32 op)
964 {
965         struct req_capsule      *pill = &info->mti_pill;
966         struct mdt_device       *mdt = info->mti_mdt;
967         struct ptlrpc_request   *req = mdt_info_req(info);
968         int                      rc;
969         ENTRY;
970
971         /* pack reply */
972         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
973                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
974                                      mdt->mdt_max_mdsize);
975         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
976                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
977                                      mdt->mdt_max_cookiesize);
978         rc = req_capsule_pack(pill);
979         if (rc != 0) {
980                 CERROR("Can't pack response, rc %d\n", rc);
981                 RETURN(rc);
982         }
983
984         /*
985          * Check this after packing response, because after we fail here without
986          * allocating response, caller anyway may want to get ldlm_reply from it
987          * and will get oops.
988          */
989         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
990                 RETURN(-EFAULT);
991
992         rc = mdt_reint_unpack(info, op);
993         if (rc != 0) {
994                 CERROR("Can't unpack reint, rc %d\n", rc);
995                 RETURN(rc);
996         }
997
998         rc = mdt_init_ucred_reint(info);
999         if (rc)
1000                 RETURN(rc);
1001
1002         rc = mdt_fix_attr_ucred(info, op);
1003         if (rc != 0)
1004                 GOTO(out, rc);
1005
1006         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1007                 struct mdt_client_data *mcd;
1008
1009                 mcd = req->rq_export->exp_mdt_data.med_mcd;
1010                 if (mcd->mcd_last_xid == req->rq_xid ||
1011                     mcd->mcd_last_close_xid == req->rq_xid) {
1012                         mdt_reconstruct(info, lhc);
1013                         rc = lustre_msg_get_status(req->rq_repmsg);
1014                         GOTO(out, rc);
1015                 }
1016                 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
1017                           mcd->mcd_last_xid);
1018         }
1019         rc = mdt_reint_rec(info, lhc);
1020
1021 out:
1022         mdt_exit_ucred(info);
1023         RETURN(rc);
1024 }
1025
1026 static long mdt_reint_opcode(struct mdt_thread_info *info,
1027                              const struct req_format **fmt)
1028 {
1029         __u32 *ptr;
1030         long opc;
1031
1032         opc = -EFAULT;
1033         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
1034         if (ptr != NULL) {
1035                 opc = *ptr;
1036                 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
1037                 if (opc < REINT_MAX && fmt[opc] != NULL)
1038                         req_capsule_extend(&info->mti_pill, fmt[opc]);
1039                 else
1040                         CERROR("Unsupported opc: %ld\n", opc);
1041         }
1042         return opc;
1043 }
1044
1045 static int mdt_reint(struct mdt_thread_info *info)
1046 {
1047         long opc;
1048         int  rc;
1049
1050         static const struct req_format *reint_fmts[REINT_MAX] = {
1051                 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
1052                 [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
1053                 [REINT_LINK]    = &RQF_MDS_REINT_LINK,
1054                 [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
1055                 [REINT_RENAME]  = &RQF_MDS_REINT_RENAME,
1056                 [REINT_OPEN]    = &RQF_MDS_REINT_OPEN
1057         };
1058
1059         ENTRY;
1060
1061         opc = mdt_reint_opcode(info, reint_fmts);
1062         if (opc >= 0) {
1063                 /*
1064                  * No lock possible here from client to pass it to reint code
1065                  * path.
1066                  */
1067                 rc = mdt_reint_internal(info, NULL, opc);
1068         } else {
1069                 rc = opc;
1070         }
1071
1072         info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
1073         RETURN(rc);
1074 }
1075
1076 /* TODO these two methods not available now. */
1077
1078 /* this should sync the whole device */
1079 static int mdt_device_sync(struct mdt_thread_info *info)
1080 {
1081         return 0;
1082 }
1083
1084 /* this should sync this object */
1085 static int mdt_object_sync(struct mdt_thread_info *info)
1086 {
1087         return 0;
1088 }
1089
1090 static int mdt_sync(struct mdt_thread_info *info)
1091 {
1092         struct req_capsule *pill = &info->mti_pill;
1093         struct mdt_body *body;
1094         int rc;
1095         ENTRY;
1096
1097         /* The fid may be zero, so we req_capsule_set manually */
1098         req_capsule_set(pill, &RQF_MDS_SYNC);
1099
1100         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1101         if (body == NULL)
1102                 RETURN(-EINVAL);
1103
1104         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
1105                 RETURN(-ENOMEM);
1106
1107         if (fid_seq(&body->fid1) == 0) {
1108                 /* sync the whole device */
1109                 rc = req_capsule_pack(pill);
1110                 if (rc == 0)
1111                         rc = mdt_device_sync(info);
1112         } else {
1113                 /* sync an object */
1114                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
1115                 if (rc == 0) {
1116                         rc = mdt_object_sync(info);
1117                         if (rc == 0) {
1118                                 struct md_object *next;
1119                                 const struct lu_fid *fid;
1120                                 struct lu_attr *la = &info->mti_attr.ma_attr;
1121
1122                                 next = mdt_object_child(info->mti_object);
1123                                 info->mti_attr.ma_need = MA_INODE;
1124                                 rc = mo_attr_get(info->mti_env, next,
1125                                                  &info->mti_attr);
1126                                 if (rc == 0) {
1127                                         body = req_capsule_server_get(pill,
1128                                                                 &RMF_MDT_BODY);
1129                                         fid = mdt_object_fid(info->mti_object);
1130                                         mdt_pack_attr2body(body, la, fid);
1131                                         mdt_body_reverse_idmap(info, body);
1132                                 }
1133                         }
1134                 }
1135         }
1136         RETURN(rc);
1137 }
1138
1139 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
1140 {
1141         return -EOPNOTSUPP;
1142 }
1143
1144 static int mdt_quotactl_handle(struct mdt_thread_info *info)
1145 {
1146         return -EOPNOTSUPP;
1147 }
1148
1149 static int mdt_renew_capa(struct mdt_thread_info *info)
1150 {
1151         struct mdt_device *mdt = info->mti_mdt;
1152         struct mdt_object *obj = info->mti_object;
1153         struct mdt_body *body;
1154         struct lustre_capa *capa;
1155         int rc;
1156         ENTRY;
1157
1158         body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
1159         LASSERT(body);
1160
1161         capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
1162         LASSERT(capa);
1163
1164         spin_lock(&capa_lock);
1165         info->mti_capa_key = *red_capa_key(mdt);
1166         spin_unlock(&capa_lock);
1167
1168         *capa = obj->mot_header.loh_capa;
1169         /* TODO: add capa check */
1170         rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa);
1171         if (rc)
1172                 RETURN(rc);
1173
1174         RETURN(rc);
1175 }
1176
1177 /*
1178  * OBD PING and other handlers.
1179  */
1180 static int mdt_obd_ping(struct mdt_thread_info *info)
1181 {
1182         int rc;
1183         ENTRY;
1184         rc = target_handle_ping(mdt_info_req(info));
1185         RETURN(rc);
1186 }
1187
1188 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
1189 {
1190         return -EOPNOTSUPP;
1191 }
1192
1193 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
1194 {
1195         return -EOPNOTSUPP;
1196 }
1197
1198
1199 /*
1200  * DLM handlers.
1201  */
1202
1203 static struct ldlm_callback_suite cbs = {
1204         .lcs_completion = ldlm_server_completion_ast,
1205         .lcs_blocking   = ldlm_server_blocking_ast,
1206         .lcs_glimpse    = NULL
1207 };
1208
1209 static int mdt_enqueue(struct mdt_thread_info *info)
1210 {
1211         struct ptlrpc_request *req;
1212         int rc;
1213
1214         /*
1215          * info->mti_dlm_req already contains swapped and (if necessary)
1216          * converted dlm request.
1217          */
1218         LASSERT(info->mti_dlm_req != NULL);
1219
1220         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) {
1221                 info->mti_fail_id = OBD_FAIL_LDLM_ENQUEUE;
1222                 return 0;
1223         }
1224
1225         req = mdt_info_req(info);
1226         rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
1227                                   req, info->mti_dlm_req, &cbs);
1228         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
1229         return rc ? : req->rq_status;
1230 }
1231
1232 static int mdt_convert(struct mdt_thread_info *info)
1233 {
1234         int rc;
1235         struct ptlrpc_request *req;
1236
1237         LASSERT(info->mti_dlm_req);
1238         req = mdt_info_req(info);
1239         rc = ldlm_handle_convert0(req, info->mti_dlm_req);
1240         return rc ? : req->rq_status;
1241 }
1242
1243 static int mdt_bl_callback(struct mdt_thread_info *info)
1244 {
1245         CERROR("bl callbacks should not happen on MDS\n");
1246         LBUG();
1247         return -EOPNOTSUPP;
1248 }
1249
1250 static int mdt_cp_callback(struct mdt_thread_info *info)
1251 {
1252         CERROR("cp callbacks should not happen on MDS\n");
1253         LBUG();
1254         return -EOPNOTSUPP;
1255 }
1256
1257 /*
1258  * sec context handlers
1259  */
1260 static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
1261 {
1262         return mdt_handle_idmap(info);
1263 }
1264
1265 static struct mdt_object *mdt_obj(struct lu_object *o)
1266 {
1267         LASSERT(lu_device_is_mdt(o->lo_dev));
1268         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
1269 }
1270
1271 struct mdt_object *mdt_object_find(const struct lu_env *env,
1272                                    struct mdt_device *d,
1273                                    const struct lu_fid *f,
1274                                    struct lustre_capa *c)
1275 {
1276         struct lu_object *o;
1277         struct mdt_object *m;
1278         ENTRY;
1279
1280         if (!d->mdt_opts.mo_mds_capa)
1281                 c = BYPASS_CAPA;
1282
1283         o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f, c);
1284         if (IS_ERR(o))
1285                 m = (struct mdt_object *)o;
1286         else
1287                 m = mdt_obj(o);
1288         RETURN(m);
1289 }
1290
1291 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
1292                     struct mdt_lock_handle *lh, __u64 ibits)
1293 {
1294         ldlm_policy_data_t *policy = &info->mti_policy;
1295         struct ldlm_res_id *res_id = &info->mti_res_id;
1296         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1297         int rc;
1298         ENTRY;
1299
1300         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1301         LASSERT(lh->mlh_mode != LCK_MINMODE);
1302         if (lu_object_exists(&o->mot_obj.mo_lu) < 0) {
1303                 LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
1304                 LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1305         }
1306         policy->l_inodebits.bits = ibits;
1307
1308         rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode,
1309                       policy, res_id);
1310         RETURN(rc);
1311 }
1312
1313 /* lock with cross-ref fixes */
1314 int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o,
1315                        struct mdt_lock_handle *lh, __u64 ibits)
1316 {
1317         if (lu_object_exists(&o->mot_obj.mo_lu) < 0) {
1318                 /* cross-ref object fix */
1319                 ibits &= ~MDS_INODELOCK_UPDATE;
1320                 ibits |= MDS_INODELOCK_LOOKUP;
1321         }
1322         return mdt_object_lock(info, o, lh, ibits);
1323 }
1324
1325 /*
1326  * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
1327  * to save this lock in req.  when transaction committed, req will be released,
1328  * and lock will, too.
1329  */
1330 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
1331                        struct mdt_lock_handle *lh, int decref)
1332 {
1333         struct ptlrpc_request *req    = mdt_info_req(info);
1334         struct lustre_handle  *handle = &lh->mlh_lh;
1335         ldlm_mode_t            mode   = lh->mlh_mode;
1336         ENTRY;
1337
1338         if (lustre_handle_is_used(handle)) {
1339                 if (decref)
1340                         fid_unlock(mdt_object_fid(o), handle, mode);
1341                 else
1342                         ptlrpc_save_lock(req, handle, mode);
1343                 handle->cookie = 0;
1344         }
1345         EXIT;
1346 }
1347
1348 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
1349                                         const struct lu_fid *f,
1350                                         struct mdt_lock_handle *lh,
1351                                         __u64 ibits,
1352                                         struct lustre_capa *capa)
1353 {
1354         struct mdt_object *o;
1355
1356         o = mdt_object_find(info->mti_env, info->mti_mdt, f, capa);
1357         if (!IS_ERR(o)) {
1358                 int rc;
1359
1360                 rc = mdt_object_lock(info, o, lh, ibits);
1361                 if (rc != 0) {
1362                         mdt_object_put(info->mti_env, o);
1363                         o = ERR_PTR(rc);
1364                 }
1365         }
1366         return o;
1367 }
1368
1369 void mdt_object_unlock_put(struct mdt_thread_info * info,
1370                            struct mdt_object * o,
1371                            struct mdt_lock_handle *lh,
1372                            int decref)
1373 {
1374         mdt_object_unlock(info, o, lh, decref);
1375         mdt_object_put(info->mti_env, o);
1376 }
1377
1378 static struct mdt_handler *mdt_handler_find(__u32 opc,
1379                                             struct mdt_opc_slice *supported)
1380 {
1381         struct mdt_opc_slice *s;
1382         struct mdt_handler   *h;
1383
1384         h = NULL;
1385         for (s = supported; s->mos_hs != NULL; s++) {
1386                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1387                         h = s->mos_hs + (opc - s->mos_opc_start);
1388                         if (h->mh_opc != 0)
1389                                 LASSERT(h->mh_opc == opc);
1390                         else
1391                                 h = NULL; /* unsupported opc */
1392                         break;
1393                 }
1394         }
1395         return h;
1396 }
1397
1398 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
1399 {
1400         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
1401 }
1402
1403 static inline __u64 req_exp_last_close_xid(struct ptlrpc_request *req)
1404 {
1405         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_close_xid);
1406 }
1407
1408 static int mdt_lock_resname_compat(struct mdt_device *m,
1409                                    struct ldlm_request *req)
1410 {
1411         /* XXX something... later. */
1412         return 0;
1413 }
1414
1415 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1416 {
1417         /* XXX something... later. */
1418         return 0;
1419 }
1420
1421 /*
1422  * Generic code handling requests that have struct mdt_body passed in:
1423  *
1424  *  - extract mdt_body from request and save it in @info, if present;
1425  *
1426  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
1427  *  @info;
1428  *
1429  *  - if HABEO_CORPUS flag is set for this request type check whether object
1430  *  actually exists on storage (lu_object_exists()).
1431  *
1432  */
1433 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1434 {
1435         struct lustre_capa       *capa = NULL;
1436         const struct mdt_body    *body;
1437         struct mdt_object        *obj;
1438         const struct lu_env      *env;
1439         struct req_capsule       *pill;
1440         int                       rc;
1441
1442         env = info->mti_env;
1443         pill = &info->mti_pill;
1444
1445         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1446         if (body == NULL)
1447                 return -EFAULT;
1448
1449         if (!fid_is_sane(&body->fid1)) {
1450                 CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1451                 return -EINVAL;
1452         }
1453
1454         /*
1455          * Dot not get size or any capa fields before we check that request
1456          * contains capa actually. There are some requests which do not, for
1457          * instance MDS_IS_SUBDIR.
1458          */
1459         if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT))
1460                 capa = req_capsule_client_get(pill, &RMF_CAPA1);
1461         
1462         obj = mdt_object_find(env, info->mti_mdt, &body->fid1, capa);
1463         if (!IS_ERR(obj)) {
1464                 if ((flags & HABEO_CORPUS) &&
1465                     !lu_object_exists(&obj->mot_obj.mo_lu)) {
1466                         mdt_object_put(env, obj);
1467                         rc = -ENOENT;
1468                 } else {
1469                         info->mti_object = obj;
1470                         rc = 0;
1471                 }
1472         } else
1473                 rc = PTR_ERR(obj);
1474
1475         return rc;
1476 }
1477
1478 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1479 {
1480         struct req_capsule *pill;
1481         int rc;
1482
1483         ENTRY;
1484         pill = &info->mti_pill;
1485
1486         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1487                 rc = mdt_body_unpack(info, flags);
1488         else
1489                 rc = 0;
1490
1491         if (rc == 0 && (flags & HABEO_REFERO)) {
1492                 struct mdt_device       *mdt = info->mti_mdt;
1493                 /*pack reply*/
1494                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1495                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1496                                              mdt->mdt_max_mdsize);
1497                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1498                         req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1499                                              mdt->mdt_max_cookiesize);
1500
1501                 rc = req_capsule_pack(pill);
1502         }
1503         RETURN(rc);
1504 }
1505
1506 #if 0
1507 struct lu_context_key mdt_txn_key;
1508 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
1509 {
1510         struct mdt_device     *mdt = info->mti_mdt;
1511         struct ptlrpc_request *req = mdt_info_req(info);
1512         struct obd_export     *exp = req->rq_export;
1513
1514         /* sometimes the reply message has not been successfully packed */
1515         if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1516                 return;
1517
1518         if (info->mti_trans_flags & MDT_NONEED_TRANSNO)
1519                 return;
1520
1521         /*XXX: assert on this when all code will be finished */
1522         if (rc != 0 && info->mti_transno != 0) {
1523                 info->mti_transno = 0;
1524                 CERROR("Transno is not 0 while rc is %i!\n", rc);
1525         }
1526
1527         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1528                info->mti_transno, exp->exp_obd->obd_last_committed);
1529
1530         spin_lock(&mdt->mdt_transno_lock);
1531         req->rq_transno = info->mti_transno;
1532         lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
1533
1534         target_committed_to_req(req);
1535
1536         spin_unlock(&mdt->mdt_transno_lock);
1537         lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
1538         //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
1539 }
1540 #endif
1541
1542
1543 /*
1544  * Invoke handler for this request opc. Also do necessary preprocessing
1545  * (according to handler ->mh_flags), and post-processing (setting of
1546  * ->last_{xid,committed}).
1547  */
1548 static int mdt_req_handle(struct mdt_thread_info *info,
1549                           struct mdt_handler *h, struct ptlrpc_request *req)
1550 {
1551         int   rc;
1552         __u32 flags;
1553
1554         ENTRY;
1555
1556         LASSERT(h->mh_act != NULL);
1557         LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1558         LASSERT(current->journal_info == NULL);
1559
1560         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1561
1562         /*
1563          * Do not use *_FAIL_CHECK_ONCE() macros, because they will stop
1564          * correct handling of failed req later in ldlm due to doing
1565          * obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED without actually
1566          * correct actions like it is done in target_send_reply_msg().
1567          */
1568         if (h->mh_fail_id != 0) {
1569                 /*
1570                  * Set to info->mti_fail_id to handler fail_id, it will be used
1571                  * later, and better than use default fail_id.
1572                  */
1573                 if (OBD_FAIL_CHECK(h->mh_fail_id)) {
1574                         info->mti_fail_id = h->mh_fail_id;
1575                         RETURN(0);
1576                 }
1577         }
1578
1579         rc = 0;
1580         flags = h->mh_flags;
1581         LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1582
1583         if (h->mh_fmt != NULL) {
1584                 req_capsule_set(&info->mti_pill, h->mh_fmt);
1585                 rc = mdt_unpack_req_pack_rep(info, flags);
1586         }
1587
1588         if (rc == 0 && flags & MUTABOR &&
1589             req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1590                 rc = -EROFS;
1591
1592         if (rc == 0 && flags & HABEO_CLAVIS) {
1593                 struct ldlm_request *dlm_req;
1594
1595                 LASSERT(h->mh_fmt != NULL);
1596
1597                 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1598                 if (dlm_req != NULL) {
1599                         if (info->mti_mdt->mdt_opts.mo_compat_resname)
1600                                 rc = mdt_lock_resname_compat(info->mti_mdt,
1601                                                              dlm_req);
1602                         info->mti_dlm_req = dlm_req;
1603                 } else {
1604                         CERROR("Can't unpack dlm request\n");
1605                         rc = -EFAULT;
1606                 }
1607         }
1608
1609         if (rc == 0)
1610                 /*
1611                  * Process request.
1612                  */
1613                 rc = h->mh_act(info);
1614
1615         req->rq_status = rc;
1616
1617         /*
1618          * It is not correct to zero @rc out here unconditionally. First of all,
1619          * for error cases, we do not need target_committed_to_req(req). Second
1620          * reason is that, @rc is passed to target_send_reply() and used for
1621          * figuring out what should be done about reply in capricular case. We
1622          * only zero it out for ELDLM_* codes which > 0 because they do not
1623          * support invariant of marking req as difficult only in case of error.
1624          */
1625         if (rc > 0)
1626                 rc = 0;
1627
1628         LASSERT(current->journal_info == NULL);
1629
1630         if (rc == 0 && (flags & HABEO_CLAVIS)
1631             && info->mti_mdt->mdt_opts.mo_compat_resname) {
1632                 struct ldlm_reply *dlmrep;
1633
1634                 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1635                 if (dlmrep != NULL)
1636                         rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1637         }
1638
1639         /* If we're DISCONNECTing, the mdt_export_data is already freed */
1640         if (rc == 0 && h->mh_opc != MDS_DISCONNECT)
1641                 target_committed_to_req(req);
1642
1643         RETURN(rc);
1644 }
1645
1646 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1647 {
1648         lh->mlh_lh.cookie = 0ull;
1649         lh->mlh_mode = LCK_MINMODE;
1650 }
1651
1652 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1653 {
1654         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1655 }
1656
1657 static void mdt_thread_info_init(struct ptlrpc_request *req,
1658                                  struct mdt_thread_info *info)
1659 {
1660         int i;
1661
1662         memset(info, 0, sizeof(*info));
1663
1664         info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1665         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1666                 info->mti_rep_buf_size[i] = -1;
1667
1668         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1669                 mdt_lock_handle_init(&info->mti_lh[i]);
1670
1671         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1672         info->mti_env = req->rq_svc_thread->t_env;
1673         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1674
1675         /* it can be NULL while CONNECT */
1676         if (req->rq_export)
1677                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1678         req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1679                          info->mti_rep_buf_size);
1680 }
1681
1682 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1683 {
1684         int i;
1685
1686         req_capsule_fini(&info->mti_pill);
1687         if (info->mti_object != NULL) {
1688                 mdt_object_put(info->mti_env, info->mti_object);
1689                 info->mti_object = NULL;
1690         }
1691         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1692                 mdt_lock_handle_fini(&info->mti_lh[i]);
1693 }
1694
1695 /* mds/handler.c */
1696 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1697                                        struct obd_device *obd, int *process);
1698 /*
1699  * Handle recovery. Return:
1700  *        +1: continue request processing;
1701  *       -ve: abort immediately with the given error code;
1702  *         0: send reply with error code in req->rq_status;
1703  */
1704 static int mdt_recovery(struct mdt_thread_info *info)
1705 {
1706         struct ptlrpc_request *req = mdt_info_req(info);
1707         int recovering;
1708         int abort_recovery;
1709         struct obd_device *obd;
1710
1711         ENTRY;
1712
1713         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1714         case MDS_CONNECT:
1715         case SEC_CTX_INIT:
1716         case SEC_CTX_INIT_CONT:
1717         case SEC_CTX_FINI:
1718                 mdt_handle_idmap(info);
1719                 RETURN(+1);
1720         }
1721
1722         if (req->rq_export == NULL) {
1723                 CERROR("operation %d on unconnected MDS from %s\n",
1724                        lustre_msg_get_opc(req->rq_reqmsg),
1725                        libcfs_id2str(req->rq_peer));
1726                 req->rq_status = -ENOTCONN;
1727                 RETURN(-ENOTCONN);
1728         }
1729
1730         /* sanity check: if the xid matches, the request must be marked as a
1731          * resent or replayed */
1732         if (req->rq_xid == req_exp_last_xid(req) ||
1733             req->rq_xid == req_exp_last_close_xid(req)) {
1734                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
1735                       (MSG_RESENT | MSG_REPLAY))) {
1736                         CERROR("rq_xid "LPU64" matches last_xid, "
1737                                 "expected RESENT flag\n", req->rq_xid);
1738                         req->rq_status = -ENOTCONN;
1739                         RETURN(-ENOTCONN);
1740                 }
1741         }
1742
1743         /* else: note the opposite is not always true; a RESENT req after a
1744          * failover will usually not match the last_xid, since it was likely
1745          * never committed. A REPLAYed request will almost never match the
1746          * last xid, however it could for a committed, but still retained,
1747          * open. */
1748
1749         obd = req->rq_export->exp_obd;
1750
1751         /* Check for aborted recovery... */
1752         spin_lock_bh(&obd->obd_processing_task_lock);
1753         abort_recovery = obd->obd_abort_recovery;
1754         recovering = obd->obd_recovering;
1755         spin_unlock_bh(&obd->obd_processing_task_lock);
1756         if (abort_recovery) {
1757                 target_abort_recovery(obd);
1758         } else if (recovering) {
1759                 int rc;
1760                 int should_process;
1761
1762                 rc = mds_filter_recovery_request(req, obd, &should_process);
1763                 if (rc != 0 || !should_process)
1764                         RETURN(rc);
1765         }
1766         RETURN(+1);
1767 }
1768
1769 static int mdt_reply(struct ptlrpc_request *req, int rc,
1770                      struct mdt_thread_info *info)
1771 {
1772         struct obd_device *obd;
1773         ENTRY;
1774
1775         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1776                 if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1777                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1778
1779                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1780                 if (obd && obd->obd_recovering) {
1781                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1782                         RETURN(target_queue_final_reply(req, rc));
1783                 } else {
1784                         /*
1785                          * Lost a race with recovery; let the error path DTRT.
1786                          */
1787                         rc = req->rq_status = -ENOTCONN;
1788                 }
1789         }
1790         target_send_reply(req, rc, info->mti_fail_id);
1791         RETURN(0);
1792 }
1793
1794 /* mds/handler.c */
1795 extern int mds_msg_check_version(struct lustre_msg *msg);
1796
1797 static int mdt_handle0(struct ptlrpc_request *req,
1798                        struct mdt_thread_info *info,
1799                        struct mdt_opc_slice *supported)
1800 {
1801         struct mdt_handler *h;
1802         struct lustre_msg  *msg;
1803         int                 rc;
1804
1805         ENTRY;
1806
1807         MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1808
1809         LASSERT(current->journal_info == NULL);
1810
1811         msg = req->rq_reqmsg;
1812         rc = mds_msg_check_version(msg);
1813         if (rc == 0) {
1814                 rc = mdt_recovery(info);
1815                 if (rc == +1) {
1816                         h = mdt_handler_find(lustre_msg_get_opc(msg),
1817                                              supported);
1818                         if (h != NULL) {
1819                                 rc = mdt_req_handle(info, h, req);
1820                         } else {
1821                                 req->rq_status = -ENOTSUPP;
1822                                 rc = ptlrpc_error(req);
1823                                 RETURN(rc);
1824                         }
1825                         rc = mdt_reply(req, rc, info);
1826                 }
1827         } else
1828                 CERROR(LUSTRE_MDT_NAME" drops mal-formed request\n");
1829         RETURN(rc);
1830 }
1831
1832 /*
1833  * MDT handler function called by ptlrpc service thread when request comes.
1834  *
1835  * XXX common "target" functionality should be factored into separate module
1836  * shared by mdt, ost and stand-alone services like fld.
1837  */
1838 static int mdt_handle_common(struct ptlrpc_request *req,
1839                              struct mdt_opc_slice *supported)
1840 {
1841         struct lu_env          *env;
1842         struct mdt_thread_info *info;
1843         int                     rc;
1844         ENTRY;
1845
1846         env = req->rq_svc_thread->t_env;
1847         LASSERT(env != NULL);
1848         LASSERT(env->le_ses != NULL);
1849         LASSERT(env->le_ctx.lc_thread == req->rq_svc_thread);
1850         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
1851         LASSERT(info != NULL);
1852
1853         mdt_thread_info_init(req, info);
1854
1855         rc = mdt_handle0(req, info, supported);
1856
1857         mdt_thread_info_fini(info);
1858         RETURN(rc);
1859 }
1860
1861 /*
1862  * This is called from recovery code as handler of _all_ RPC types, FLD and SEQ
1863  * as well.
1864  */
1865 static int mdt_recovery_handle(struct ptlrpc_request *req)
1866 {
1867         int rc;
1868         ENTRY;
1869
1870         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1871         case FLD_QUERY:
1872                 rc = mdt_handle_common(req, mdt_fld_handlers);
1873                 break;
1874         case SEQ_QUERY:
1875                 rc = mdt_handle_common(req, mdt_seq_handlers);
1876                 break;
1877         default:
1878                 rc = mdt_handle_common(req, mdt_regular_handlers);
1879                 break;
1880         }
1881
1882         RETURN(rc);
1883 }
1884
1885 static int mdt_regular_handle(struct ptlrpc_request *req)
1886 {
1887         return mdt_handle_common(req, mdt_regular_handlers);
1888 }
1889
1890 static int mdt_readpage_handle(struct ptlrpc_request *req)
1891 {
1892         return mdt_handle_common(req, mdt_readpage_handlers);
1893 }
1894
1895 static int mdt_mdsc_handle(struct ptlrpc_request *req)
1896 {
1897         return mdt_handle_common(req, mdt_seq_handlers);
1898 }
1899
1900 static int mdt_mdss_handle(struct ptlrpc_request *req)
1901 {
1902         return mdt_handle_common(req, mdt_seq_handlers);
1903 }
1904
1905 static int mdt_dtss_handle(struct ptlrpc_request *req)
1906 {
1907         return mdt_handle_common(req, mdt_seq_handlers);
1908 }
1909
1910 static int mdt_fld_handle(struct ptlrpc_request *req)
1911 {
1912         return mdt_handle_common(req, mdt_fld_handlers);
1913 }
1914
1915 enum mdt_it_code {
1916         MDT_IT_OPEN,
1917         MDT_IT_OCREAT,
1918         MDT_IT_CREATE,
1919         MDT_IT_GETATTR,
1920         MDT_IT_READDIR,
1921         MDT_IT_LOOKUP,
1922         MDT_IT_UNLINK,
1923         MDT_IT_TRUNC,
1924         MDT_IT_GETXATTR,
1925         MDT_IT_NR
1926 };
1927
1928 static int mdt_intent_getattr(enum mdt_it_code opcode,
1929                               struct mdt_thread_info *info,
1930                               struct ldlm_lock **,
1931                               int);
1932 static int mdt_intent_reint(enum mdt_it_code opcode,
1933                             struct mdt_thread_info *info,
1934                             struct ldlm_lock **,
1935                             int);
1936
1937 static struct mdt_it_flavor {
1938         const struct req_format *it_fmt;
1939         __u32                    it_flags;
1940         int                    (*it_act)(enum mdt_it_code ,
1941                                          struct mdt_thread_info *,
1942                                          struct ldlm_lock **,
1943                                          int);
1944         long                     it_reint;
1945 } mdt_it_flavor[] = {
1946         [MDT_IT_OPEN]     = {
1947                 .it_fmt   = &RQF_LDLM_INTENT,
1948                 /*.it_flags = HABEO_REFERO,*/
1949                 .it_flags = 0,
1950                 .it_act   = mdt_intent_reint,
1951                 .it_reint = REINT_OPEN
1952         },
1953         [MDT_IT_OCREAT]   = {
1954                 .it_fmt   = &RQF_LDLM_INTENT,
1955                 .it_flags = MUTABOR,
1956                 .it_act   = mdt_intent_reint,
1957                 .it_reint = REINT_OPEN
1958         },
1959         [MDT_IT_CREATE]   = {
1960                 .it_fmt   = &RQF_LDLM_INTENT,
1961                 .it_flags = MUTABOR,
1962                 .it_act   = mdt_intent_reint,
1963                 .it_reint = REINT_CREATE
1964         },
1965         [MDT_IT_GETATTR]  = {
1966                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1967                 .it_flags = HABEO_REFERO,
1968                 .it_act   = mdt_intent_getattr
1969         },
1970         [MDT_IT_READDIR]  = {
1971                 .it_fmt   = NULL,
1972                 .it_flags = 0,
1973                 .it_act   = NULL
1974         },
1975         [MDT_IT_LOOKUP]   = {
1976                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
1977                 .it_flags = HABEO_REFERO,
1978                 .it_act   = mdt_intent_getattr
1979         },
1980         [MDT_IT_UNLINK]   = {
1981                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
1982                 .it_flags = MUTABOR,
1983                 .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
1984                 .it_reint = REINT_UNLINK
1985         },
1986         [MDT_IT_TRUNC]    = {
1987                 .it_fmt   = NULL,
1988                 .it_flags = MUTABOR,
1989                 .it_act   = NULL
1990         },
1991         [MDT_IT_GETXATTR] = {
1992                 .it_fmt   = NULL,
1993                 .it_flags = 0,
1994                 .it_act   = NULL
1995         }
1996 };
1997
1998 int mdt_intent_lock_replace(struct mdt_thread_info *info,
1999                             struct ldlm_lock **lockp,
2000                             struct ldlm_lock *new_lock,
2001                             struct mdt_lock_handle *lh,
2002                             int flags)
2003 {
2004         struct ptlrpc_request  *req = mdt_info_req(info);
2005         struct ldlm_lock       *lock = *lockp;
2006
2007         /*
2008          * Get new lock only for cases when possible resent did not find any
2009          * lock.
2010          */
2011         if (new_lock == NULL)
2012                 new_lock = ldlm_handle2lock(&lh->mlh_lh);
2013
2014         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2015                 RETURN(0);
2016
2017         LASSERTF(new_lock != NULL,
2018                  "lockh "LPX64"\n", lh->mlh_lh.cookie);
2019
2020         /*
2021          * If we've already given this lock to a client once, then we should
2022          * have no readers or writers.  Otherwise, we should have one reader
2023          * _or_ writer ref (which will be zeroed below) before returning the
2024          * lock to a client.
2025          */
2026         if (new_lock->l_export == req->rq_export) {
2027                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2028         } else {
2029                 LASSERT(new_lock->l_export == NULL);
2030                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2031         }
2032
2033         *lockp = new_lock;
2034
2035         if (new_lock->l_export == req->rq_export) {
2036                 /*
2037                  * Already gave this to the client, which means that we
2038                  * reconstructed a reply.
2039                  */
2040                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2041                         MSG_RESENT);
2042                 RETURN(ELDLM_LOCK_REPLACED);
2043         }
2044
2045         /* Fixup the lock to be given to the client */
2046         lock_res_and_lock(new_lock);
2047         new_lock->l_readers = 0;
2048         new_lock->l_writers = 0;
2049
2050         new_lock->l_export = class_export_get(req->rq_export);
2051         list_add(&new_lock->l_export_chain,
2052                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2053
2054         new_lock->l_blocking_ast = lock->l_blocking_ast;
2055         new_lock->l_completion_ast = lock->l_completion_ast;
2056         new_lock->l_remote_handle = lock->l_remote_handle;
2057         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2058
2059         unlock_res_and_lock(new_lock);
2060         LDLM_LOCK_PUT(new_lock);
2061         lh->mlh_lh.cookie = 0;
2062
2063         RETURN(ELDLM_LOCK_REPLACED);
2064 }
2065
2066 static void mdt_fixup_resent(struct req_capsule *pill,
2067                              struct ldlm_lock *new_lock,
2068                              struct ldlm_lock **old_lock,
2069                              struct mdt_lock_handle *lh)
2070 {
2071         struct ptlrpc_request  *req = pill->rc_req;
2072         struct obd_export      *exp = req->rq_export;
2073         struct lustre_handle    remote_hdl;
2074         struct ldlm_request    *dlmreq;
2075         struct list_head       *iter;
2076
2077         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2078                 return;
2079
2080         dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
2081         remote_hdl = dlmreq->lock_handle1;
2082
2083         spin_lock(&exp->exp_ldlm_data.led_lock);
2084         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2085                 struct ldlm_lock *lock;
2086                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2087                 if (lock == new_lock)
2088                         continue;
2089                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2090                         lh->mlh_lh.cookie = lock->l_handle.h_cookie;
2091                         lh->mlh_mode = lock->l_granted_mode;
2092
2093                         LDLM_DEBUG(lock, "restoring lock cookie");
2094                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2095                                   lh->mlh_lh.cookie);
2096                         if (old_lock)
2097                                 *old_lock = LDLM_LOCK_GET(lock);
2098                         spin_unlock(&exp->exp_ldlm_data.led_lock);
2099                         return;
2100                 }
2101         }
2102         spin_unlock(&exp->exp_ldlm_data.led_lock);
2103
2104         /*
2105          * If the xid matches, then we know this is a resent request, and allow
2106          * it. (It's probably an OPEN, for which we don't send a lock.
2107          */
2108         if (req->rq_xid == req_exp_last_xid(req))
2109                 return;
2110
2111         if (req->rq_xid == req_exp_last_close_xid(req))
2112                 return;
2113
2114         /*
2115          * This remote handle isn't enqueued, so we never received or processed
2116          * this request.  Clear MSG_RESENT, because it can be handled like any
2117          * normal request now.
2118          */
2119         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2120
2121         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2122                   remote_hdl.cookie);
2123 }
2124
2125 static int mdt_intent_getattr(enum mdt_it_code opcode,
2126                               struct mdt_thread_info *info,
2127                               struct ldlm_lock **lockp,
2128                               int flags)
2129 {
2130         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2131         struct ldlm_lock       *new_lock = NULL;
2132         __u64                   child_bits;
2133         struct ldlm_reply      *ldlm_rep;
2134         struct ptlrpc_request  *req;
2135         struct mdt_body        *reqbody;
2136         int                     rc;
2137
2138         ENTRY;
2139
2140         switch (opcode) {
2141         case MDT_IT_LOOKUP:
2142                 child_bits = MDS_INODELOCK_LOOKUP;
2143                 break;
2144         case MDT_IT_GETATTR:
2145                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2146                 break;
2147         default:
2148                 CERROR("Unhandled till now");
2149                 RETURN(-EINVAL);
2150         }
2151
2152         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
2153         if (reqbody == NULL)
2154                 RETURN(-EFAULT);
2155
2156         rc = mdt_init_ucred(info, reqbody);
2157         if (rc)
2158                 RETURN(rc);
2159
2160         req = info->mti_pill.rc_req;
2161         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2162         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
2163
2164         /* Get lock from request for possible resent case. */
2165         mdt_fixup_resent(&info->mti_pill, *lockp, &new_lock, lhc);
2166
2167         ldlm_rep->lock_policy_res2 =
2168                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
2169         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1, 1, 0);
2170
2171         if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
2172                 ldlm_rep->lock_policy_res2 = 0;
2173         if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
2174                     ldlm_rep->lock_policy_res2) {
2175                 GOTO(out, rc = ELDLM_LOCK_ABORTED);
2176         }
2177
2178         rc = mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
2179         EXIT;
2180 out:
2181         mdt_exit_ucred(info);
2182         return rc;
2183 }
2184
2185 static int mdt_intent_reint(enum mdt_it_code opcode,
2186                             struct mdt_thread_info *info,
2187                             struct ldlm_lock **lockp,
2188                             int flags)
2189 {
2190         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2191         struct ldlm_reply      *rep;
2192         long                    opc;
2193         int                     rc;
2194
2195         static const struct req_format *intent_fmts[REINT_MAX] = {
2196                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
2197                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
2198         };
2199
2200         ENTRY;
2201
2202         opc = mdt_reint_opcode(info, intent_fmts);
2203         if (opc < 0)
2204                 RETURN(opc);
2205
2206         if (mdt_it_flavor[opcode].it_reint != opc) {
2207                 CERROR("Reint code %ld doesn't match intent: %d\n",
2208                        opc, opcode);
2209                 RETURN(-EPROTO);
2210         }
2211
2212         /* Get lock from request for possible resent case. */
2213         mdt_fixup_resent(&info->mti_pill, *lockp, NULL, lhc);
2214
2215         rc = mdt_reint_internal(info, lhc, opc);
2216
2217         rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2218         if (rep == NULL)
2219                 RETURN(-EFAULT);
2220
2221         /* MDC expects this in any case */
2222         if (rc != 0)
2223                 mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
2224
2225         rep->lock_policy_res2 = rc;
2226
2227         /* cross-ref case, the lock should be returned to the client */
2228         if (rc == -EREMOTE) {
2229                 LASSERT(lustre_handle_is_used(&lhc->mlh_lh));
2230                 rep->lock_policy_res2 = 0;
2231                 RETURN(mdt_intent_lock_replace(info, lockp, NULL, lhc, flags));
2232         }
2233         rep->lock_policy_res2 = rc;
2234
2235         RETURN(ELDLM_LOCK_ABORTED);
2236 }
2237
2238 static int mdt_intent_code(long itcode)
2239 {
2240         int rc;
2241
2242         switch(itcode) {
2243         case IT_OPEN:
2244                 rc = MDT_IT_OPEN;
2245                 break;
2246         case IT_OPEN|IT_CREAT:
2247                 rc = MDT_IT_OCREAT;
2248                 break;
2249         case IT_CREAT:
2250                 rc = MDT_IT_CREATE;
2251                 break;
2252         case IT_READDIR:
2253                 rc = MDT_IT_READDIR;
2254                 break;
2255         case IT_GETATTR:
2256                 rc = MDT_IT_GETATTR;
2257                 break;
2258         case IT_LOOKUP:
2259                 rc = MDT_IT_LOOKUP;
2260                 break;
2261         case IT_UNLINK:
2262                 rc = MDT_IT_UNLINK;
2263                 break;
2264         case IT_TRUNC:
2265                 rc = MDT_IT_TRUNC;
2266                 break;
2267         case IT_GETXATTR:
2268                 rc = MDT_IT_GETXATTR;
2269                 break;
2270         default:
2271                 CERROR("Unknown intent opcode: %ld\n", itcode);
2272                 rc = -EINVAL;
2273                 break;
2274         }
2275         return rc;
2276 }
2277
2278 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
2279                           struct ldlm_lock **lockp, int flags)
2280 {
2281         struct req_capsule   *pill;
2282         struct mdt_it_flavor *flv;
2283         int opc;
2284         int rc;
2285         ENTRY;
2286
2287         opc = mdt_intent_code(itopc);
2288         if (opc < 0)
2289                 RETURN(-EINVAL);
2290
2291         pill = &info->mti_pill;
2292         flv  = &mdt_it_flavor[opc];
2293
2294         if (flv->it_fmt != NULL)
2295                 req_capsule_extend(pill, flv->it_fmt);
2296
2297         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
2298         if (rc == 0) {
2299                 struct ptlrpc_request *req = mdt_info_req(info);
2300                 if (flv->it_flags & MUTABOR &&
2301                     req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
2302                         rc = -EROFS;
2303         }
2304         if (rc == 0 && flv->it_act != NULL) {
2305                 /* execute policy */
2306                 rc = flv->it_act(opc, info, lockp, flags);
2307         } else
2308                 rc = -EOPNOTSUPP;
2309         RETURN(rc);
2310 }
2311
2312 static int mdt_intent_policy(struct ldlm_namespace *ns,
2313                              struct ldlm_lock **lockp, void *req_cookie,
2314                              ldlm_mode_t mode, int flags, void *data)
2315 {
2316         struct mdt_thread_info *info;
2317         struct ptlrpc_request  *req  =  req_cookie;
2318         struct ldlm_intent     *it;
2319         struct req_capsule     *pill;
2320         struct ldlm_lock       *lock = *lockp;
2321         int rc;
2322
2323         ENTRY;
2324
2325         LASSERT(req != NULL);
2326
2327         info = lu_context_key_get(&req->rq_svc_thread->t_env->le_ctx,
2328                                   &mdt_thread_key);
2329         LASSERT(info != NULL);
2330         pill = &info->mti_pill;
2331         LASSERT(pill->rc_req == req);
2332
2333         if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
2334                 req_capsule_extend(pill, &RQF_LDLM_INTENT);
2335                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
2336                 if (it != NULL) {
2337                         LDLM_DEBUG(lock, "intent policy opc: %s\n",
2338                                    ldlm_it2str(it->opc));
2339
2340                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
2341                         if (rc == 0)
2342                                 rc = ELDLM_OK;
2343                 } else
2344                         rc = -EFAULT;
2345         } else {
2346                 /* No intent was provided */
2347                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
2348                 rc = req_capsule_pack(pill);
2349         }
2350         RETURN(rc);
2351 }
2352
2353 /*
2354  * Seq wrappers
2355  */
2356 static int mdt_seq_fini(const struct lu_env *env,
2357                         struct mdt_device *m)
2358 {
2359         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2360         ENTRY;
2361
2362         if (ls && ls->ls_server_seq) {
2363                 seq_server_fini(ls->ls_server_seq, env);
2364                 OBD_FREE_PTR(ls->ls_server_seq);
2365                 ls->ls_server_seq = NULL;
2366         }
2367
2368         if (ls && ls->ls_control_seq) {
2369                 seq_server_fini(ls->ls_control_seq, env);
2370                 OBD_FREE_PTR(ls->ls_control_seq);
2371                 ls->ls_control_seq = NULL;
2372         }
2373
2374         if (ls && ls->ls_client_seq) {
2375                 seq_client_fini(ls->ls_client_seq);
2376                 OBD_FREE_PTR(ls->ls_client_seq);
2377                 ls->ls_client_seq = NULL;
2378         }
2379
2380         RETURN(0);
2381 }
2382
2383 static int mdt_seq_init(const struct lu_env *env,
2384                         const char *uuid,
2385                         struct mdt_device *m)
2386 {
2387         struct lu_site *ls;
2388         char *prefix;
2389         int rc;
2390         ENTRY;
2391
2392         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2393
2394         /*
2395          * This is sequence-controller node. Init seq-controller server on local
2396          * MDT.
2397          */
2398         if (ls->ls_node_id == 0) {
2399                 LASSERT(ls->ls_control_seq == NULL);
2400
2401                 OBD_ALLOC_PTR(ls->ls_control_seq);
2402                 if (ls->ls_control_seq == NULL)
2403                         RETURN(-ENOMEM);
2404
2405                 rc = seq_server_init(ls->ls_control_seq,
2406                                      m->mdt_bottom, uuid,
2407                                      LUSTRE_SEQ_CONTROLLER,
2408                                      env);
2409
2410                 if (rc)
2411                         GOTO(out_seq_fini, rc);
2412
2413                 OBD_ALLOC_PTR(ls->ls_client_seq);
2414                 if (ls->ls_client_seq == NULL)
2415                         GOTO(out_seq_fini, rc = -ENOMEM);
2416
2417                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2418                 if (prefix == NULL) {
2419                         OBD_FREE_PTR(ls->ls_client_seq);
2420                         GOTO(out_seq_fini, rc = -ENOMEM);
2421                 }
2422
2423                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2424                          uuid);
2425
2426                 /*
2427                  * Init seq-controller client after seq-controller server is
2428                  * ready. Pass ls->ls_control_seq to it for direct talking.
2429                  */
2430                 rc = seq_client_init(ls->ls_client_seq, NULL,
2431                                      LUSTRE_SEQ_METADATA, prefix,
2432                                      ls->ls_control_seq);
2433                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2434
2435                 if (rc)
2436                         GOTO(out_seq_fini, rc);
2437         }
2438
2439         /* Init seq-server on local MDT */
2440         LASSERT(ls->ls_server_seq == NULL);
2441
2442         OBD_ALLOC_PTR(ls->ls_server_seq);
2443         if (ls->ls_server_seq == NULL)
2444                 GOTO(out_seq_fini, rc = -ENOMEM);
2445
2446         rc = seq_server_init(ls->ls_server_seq,
2447                              m->mdt_bottom, uuid,
2448                              LUSTRE_SEQ_SERVER,
2449                              env);
2450         if (rc)
2451                 GOTO(out_seq_fini, rc = -ENOMEM);
2452
2453         /* Assign seq-controller client to local seq-server. */
2454         if (ls->ls_node_id == 0) {
2455                 LASSERT(ls->ls_client_seq != NULL);
2456
2457                 rc = seq_server_set_cli(ls->ls_server_seq,
2458                                         ls->ls_client_seq,
2459                                         env);
2460         }
2461
2462         EXIT;
2463 out_seq_fini:
2464         if (rc)
2465                 mdt_seq_fini(env, m);
2466
2467         return rc;
2468 }
2469
2470 static int mdt_md_connect(const struct lu_env *env,
2471                           struct lustre_handle *conn,
2472                           struct obd_device *mdc)
2473 {
2474         struct obd_connect_data *ocd;
2475         int rc;
2476
2477         OBD_ALLOC_PTR(ocd);
2478         if (!ocd)
2479                 RETURN(-ENOMEM);
2480         /* The connection between MDS must be local */
2481         ocd->ocd_connect_flags |= OBD_CONNECT_LCL_CLIENT;
2482         rc = obd_connect(env, conn, mdc, &mdc->obd_uuid, ocd);
2483
2484         OBD_FREE_PTR(ocd);
2485
2486         RETURN(rc);
2487 }
2488 /*
2489  * Init client sequence manager which is used by local MDS to talk to sequence
2490  * controller on remote node.
2491  */
2492 static int mdt_seq_init_cli(const struct lu_env *env,
2493                             struct mdt_device *m,
2494                             struct lustre_cfg *cfg)
2495 {
2496         struct lu_site    *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2497         struct obd_device *mdc;
2498         struct obd_uuid   *uuidp, *mdcuuidp;
2499         char              *uuid_str, *mdc_uuid_str;
2500         int               rc;
2501         int               index;
2502         struct mdt_thread_info *info;
2503         char *p, *index_string = lustre_cfg_string(cfg, 2);
2504         ENTRY;
2505
2506         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
2507         uuidp = &info->mti_u.uuid[0];
2508         mdcuuidp = &info->mti_u.uuid[1];
2509
2510         LASSERT(index_string);
2511
2512         index = simple_strtol(index_string, &p, 10);
2513         if (*p) {
2514                 CERROR("Invalid index in lustre_cgf, offset 2\n");
2515                 RETURN(-EINVAL);
2516         }
2517
2518         /* check if this is adding the first MDC and controller is not yet
2519          * initialized. */
2520         if (index != 0 || ls->ls_client_seq)
2521                 RETURN(0);
2522
2523         uuid_str = lustre_cfg_string(cfg, 1);
2524         mdc_uuid_str = lustre_cfg_string(cfg, 4);
2525         obd_str2uuid(uuidp, uuid_str);
2526         obd_str2uuid(mdcuuidp, mdc_uuid_str);
2527
2528         mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
2529         if (!mdc) {
2530                 CERROR("can't find controller MDC by uuid %s\n",
2531                        uuid_str);
2532                 rc = -ENOENT;
2533         } else if (!mdc->obd_set_up) {
2534                 CERROR("target %s not set up\n", mdc->obd_name);
2535                 rc = -EINVAL;
2536         } else {
2537                 struct lustre_handle conn = {0, };
2538
2539                 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
2540                        mdc->obd_name, mdc->obd_uuid.uuid);
2541
2542                 rc = mdt_md_connect(env, &conn, mdc);
2543                 if (rc) {
2544                         CERROR("target %s connect error %d\n",
2545                                mdc->obd_name, rc);
2546                 } else {
2547                         ls->ls_control_exp = class_conn2export(&conn);
2548
2549                         OBD_ALLOC_PTR(ls->ls_client_seq);
2550
2551                         if (ls->ls_client_seq != NULL) {
2552                                 char *prefix;
2553
2554                                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2555                                 if (!prefix)
2556                                         RETURN(-ENOMEM);
2557
2558                                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2559                                          mdc->obd_name);
2560
2561                                 rc = seq_client_init(ls->ls_client_seq,
2562                                                      ls->ls_control_exp,
2563                                                      LUSTRE_SEQ_METADATA,
2564                                                      prefix, NULL);
2565                                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2566                         } else
2567                                 rc = -ENOMEM;
2568
2569                         if (rc)
2570                                 RETURN(rc);
2571
2572                         LASSERT(ls->ls_server_seq != NULL);
2573
2574                         rc = seq_server_set_cli(ls->ls_server_seq,
2575                                                 ls->ls_client_seq,
2576                                                 env);
2577                 }
2578         }
2579
2580         RETURN(rc);
2581 }
2582
2583 static void mdt_seq_fini_cli(struct mdt_device *m)
2584 {
2585         struct lu_site *ls;
2586         int rc;
2587
2588         ENTRY;
2589
2590         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2591
2592         if (ls && ls->ls_server_seq)
2593                 seq_server_set_cli(ls->ls_server_seq,
2594                                    NULL, NULL);
2595
2596         if (ls && ls->ls_control_exp) {
2597                 rc = obd_disconnect(ls->ls_control_exp);
2598                 if (rc) {
2599                         CERROR("failure to disconnect "
2600                                "obd: %d\n", rc);
2601                 }
2602                 ls->ls_control_exp = NULL;
2603         }
2604         EXIT;
2605 }
2606
2607 /*
2608  * FLD wrappers
2609  */
2610 static int mdt_fld_fini(const struct lu_env *env,
2611                         struct mdt_device *m)
2612 {
2613         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2614         ENTRY;
2615
2616         if (ls && ls->ls_server_fld) {
2617                 fld_server_fini(ls->ls_server_fld, env);
2618                 OBD_FREE_PTR(ls->ls_server_fld);
2619                 ls->ls_server_fld = NULL;
2620         }
2621
2622         if (ls && ls->ls_client_fld != NULL) {
2623                 fld_client_fini(ls->ls_client_fld);
2624                 OBD_FREE_PTR(ls->ls_client_fld);
2625                 ls->ls_client_fld = NULL;
2626         }
2627
2628         RETURN(0);
2629 }
2630
2631 static int mdt_fld_init(const struct lu_env *env,
2632                         const char *uuid,
2633                         struct mdt_device *m)
2634 {
2635         struct lu_fld_target target;
2636         struct lu_site *ls;
2637         int rc;
2638         ENTRY;
2639
2640         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2641
2642         OBD_ALLOC_PTR(ls->ls_server_fld);
2643         if (ls->ls_server_fld == NULL)
2644                 RETURN(rc = -ENOMEM);
2645
2646         rc = fld_server_init(ls->ls_server_fld,
2647                              m->mdt_bottom, uuid, env);
2648         if (rc) {
2649                 OBD_FREE_PTR(ls->ls_server_fld);
2650                 ls->ls_server_fld = NULL;
2651         }
2652
2653         OBD_ALLOC_PTR(ls->ls_client_fld);
2654         if (!ls->ls_client_fld)
2655                 GOTO(out_fld_fini, rc = -ENOMEM);
2656
2657         rc = fld_client_init(ls->ls_client_fld, uuid,
2658                              LUSTRE_CLI_FLD_HASH_DHT);
2659         if (rc) {
2660                 CERROR("can't init FLD, err %d\n",  rc);
2661                 OBD_FREE_PTR(ls->ls_client_fld);
2662                 GOTO(out_fld_fini, rc);
2663         }
2664
2665         target.ft_srv = ls->ls_server_fld;
2666         target.ft_idx = ls->ls_node_id;
2667         target.ft_exp = NULL;
2668
2669         fld_client_add_target(ls->ls_client_fld, &target);
2670         EXIT;
2671 out_fld_fini:
2672         if (rc)
2673                 mdt_fld_fini(env, m);
2674         return rc;
2675 }
2676
2677 /* device init/fini methods */
2678 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2679 {
2680         if (m->mdt_regular_service != NULL) {
2681                 ptlrpc_unregister_service(m->mdt_regular_service);
2682                 m->mdt_regular_service = NULL;
2683         }
2684         if (m->mdt_readpage_service != NULL) {
2685                 ptlrpc_unregister_service(m->mdt_readpage_service);
2686                 m->mdt_readpage_service = NULL;
2687         }
2688         if (m->mdt_setattr_service != NULL) {
2689                 ptlrpc_unregister_service(m->mdt_setattr_service);
2690                 m->mdt_setattr_service = NULL;
2691         }
2692         if (m->mdt_mdsc_service != NULL) {
2693                 ptlrpc_unregister_service(m->mdt_mdsc_service);
2694                 m->mdt_mdsc_service = NULL;
2695         }
2696         if (m->mdt_mdss_service != NULL) {
2697                 ptlrpc_unregister_service(m->mdt_mdss_service);
2698                 m->mdt_mdss_service = NULL;
2699         }
2700         if (m->mdt_dtss_service != NULL) {
2701                 ptlrpc_unregister_service(m->mdt_dtss_service);
2702                 m->mdt_dtss_service = NULL;
2703         }
2704         if (m->mdt_fld_service != NULL) {
2705                 ptlrpc_unregister_service(m->mdt_fld_service);
2706                 m->mdt_fld_service = NULL;
2707         }
2708 }
2709
2710 static int mdt_start_ptlrpc_service(struct mdt_device *m)
2711 {
2712         int rc;
2713         static struct ptlrpc_service_conf conf;
2714         ENTRY;
2715
2716         conf = (typeof(conf)) {
2717                 .psc_nbufs            = MDS_NBUFS,
2718                 .psc_bufsize          = MDS_BUFSIZE,
2719                 .psc_max_req_size     = MDS_MAXREQSIZE,
2720                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2721                 .psc_req_portal       = MDS_REQUEST_PORTAL,
2722                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2723                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2724                 /*
2725                  * We'd like to have a mechanism to set this on a per-device
2726                  * basis, but alas...
2727                  */
2728                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2729                                        MDT_MAX_THREADS),
2730                 .psc_ctx_tags      = LCT_MD_THREAD
2731         };
2732
2733         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2734         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2735                            "mdt_ldlm_client", m->mdt_ldlm_client);
2736
2737         m->mdt_regular_service =
2738                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
2739                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2740                                      NULL);
2741         if (m->mdt_regular_service == NULL)
2742                 RETURN(-ENOMEM);
2743
2744         rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
2745         if (rc)
2746                 GOTO(err_mdt_svc, rc);
2747
2748         /*
2749          * readpage service configuration. Parameters have to be adjusted,
2750          * ideally.
2751          */
2752         conf = (typeof(conf)) {
2753                 .psc_nbufs            = MDS_NBUFS,
2754                 .psc_bufsize          = MDS_BUFSIZE,
2755                 .psc_max_req_size     = MDS_MAXREQSIZE,
2756                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2757                 .psc_req_portal       = MDS_READPAGE_PORTAL,
2758                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2759                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2760                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2761                                        MDT_MAX_THREADS),
2762                 .psc_ctx_tags      = LCT_MD_THREAD
2763         };
2764         m->mdt_readpage_service =
2765                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2766                                      LUSTRE_MDT_NAME "_readpage",
2767                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2768                                      NULL);
2769
2770         if (m->mdt_readpage_service == NULL) {
2771                 CERROR("failed to start readpage service\n");
2772                 GOTO(err_mdt_svc, rc = -ENOMEM);
2773         }
2774
2775         rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2776
2777         /*
2778          * setattr service configuration.
2779          */
2780         conf = (typeof(conf)) {
2781                 .psc_nbufs            = MDS_NBUFS,
2782                 .psc_bufsize          = MDS_BUFSIZE,
2783                 .psc_max_req_size     = MDS_MAXREQSIZE,
2784                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2785                 .psc_req_portal       = MDS_SETATTR_PORTAL,
2786                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2787                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2788                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2789                                        MDT_MAX_THREADS),
2790                 .psc_ctx_tags      = LCT_MD_THREAD
2791         };
2792
2793         m->mdt_setattr_service =
2794                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
2795                                      LUSTRE_MDT_NAME "_setattr",
2796                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2797                                      NULL);
2798
2799         if (!m->mdt_setattr_service) {
2800                 CERROR("failed to start setattr service\n");
2801                 GOTO(err_mdt_svc, rc = -ENOMEM);
2802         }
2803
2804         rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2805         if (rc)
2806                 GOTO(err_mdt_svc, rc);
2807
2808         /*
2809          * sequence controller service configuration
2810          */
2811         conf = (typeof(conf)) {
2812                 .psc_nbufs = MDS_NBUFS,
2813                 .psc_bufsize = MDS_BUFSIZE,
2814                 .psc_max_req_size = SEQ_MAXREQSIZE,
2815                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2816                 .psc_req_portal = SEQ_CONTROLLER_PORTAL,
2817                 .psc_rep_portal = MDC_REPLY_PORTAL,
2818                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2819                 .psc_num_threads = SEQ_NUM_THREADS,
2820                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2821         };
2822
2823         m->mdt_mdsc_service =
2824                 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
2825                                      LUSTRE_MDT_NAME"_mdsc",
2826                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2827                                      NULL);
2828         if (!m->mdt_mdsc_service) {
2829                 CERROR("failed to start seq controller service\n");
2830                 GOTO(err_mdt_svc, rc = -ENOMEM);
2831         }
2832
2833         rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
2834         if (rc)
2835                 GOTO(err_mdt_svc, rc);
2836
2837         /*
2838          * metadata sequence server service configuration
2839          */
2840         conf = (typeof(conf)) {
2841                 .psc_nbufs = MDS_NBUFS,
2842                 .psc_bufsize = MDS_BUFSIZE,
2843                 .psc_max_req_size = SEQ_MAXREQSIZE,
2844                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2845                 .psc_req_portal = SEQ_METADATA_PORTAL,
2846                 .psc_rep_portal = MDC_REPLY_PORTAL,
2847                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2848                 .psc_num_threads = SEQ_NUM_THREADS,
2849                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2850         };
2851
2852         m->mdt_mdss_service =
2853                 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
2854                                      LUSTRE_MDT_NAME"_mdss",
2855                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2856                                      NULL);
2857         if (!m->mdt_mdss_service) {
2858                 CERROR("failed to start metadata seq server service\n");
2859                 GOTO(err_mdt_svc, rc = -ENOMEM);
2860         }
2861
2862         rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
2863         if (rc)
2864                 GOTO(err_mdt_svc, rc);
2865
2866
2867         /*
2868          * Data sequence server service configuration. We want to have really
2869          * cluster-wide sequences space. This is why we start only one sequence
2870          * controller which manages space.
2871          */
2872         conf = (typeof(conf)) {
2873                 .psc_nbufs = MDS_NBUFS,
2874                 .psc_bufsize = MDS_BUFSIZE,
2875                 .psc_max_req_size = SEQ_MAXREQSIZE,
2876                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2877                 .psc_req_portal = SEQ_DATA_PORTAL,
2878                 .psc_rep_portal = OSC_REPLY_PORTAL,
2879                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2880                 .psc_num_threads = SEQ_NUM_THREADS,
2881                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2882         };
2883
2884         m->mdt_dtss_service =
2885                 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
2886                                      LUSTRE_MDT_NAME"_dtss",
2887                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2888                                      NULL);
2889         if (!m->mdt_dtss_service) {
2890                 CERROR("failed to start data seq server service\n");
2891                 GOTO(err_mdt_svc, rc = -ENOMEM);
2892         }
2893
2894         rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
2895         if (rc)
2896                 GOTO(err_mdt_svc, rc);
2897
2898         /* FLD service start */
2899         conf = (typeof(conf)) {
2900                 .psc_nbufs            = MDS_NBUFS,
2901                 .psc_bufsize          = MDS_BUFSIZE,
2902                 .psc_max_req_size     = FLD_MAXREQSIZE,
2903                 .psc_max_reply_size   = FLD_MAXREPSIZE,
2904                 .psc_req_portal       = FLD_REQUEST_PORTAL,
2905                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2906                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2907                 .psc_num_threads      = FLD_NUM_THREADS,
2908                 .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
2909         };
2910
2911         m->mdt_fld_service =
2912                 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
2913                                      LUSTRE_MDT_NAME"_fld",
2914                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2915                                      NULL);
2916         if (!m->mdt_fld_service) {
2917                 CERROR("failed to start fld service\n");
2918                 GOTO(err_mdt_svc, rc = -ENOMEM);
2919         }
2920
2921         rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
2922         if (rc)
2923                 GOTO(err_mdt_svc, rc);
2924
2925         EXIT;
2926 err_mdt_svc:
2927         if (rc)
2928                 mdt_stop_ptlrpc_service(m);
2929
2930         return rc;
2931 }
2932
2933 static void mdt_stack_fini(const struct lu_env *env,
2934                            struct mdt_device *m, struct lu_device *top)
2935 {
2936         struct lu_device        *d = top, *n;
2937         struct lustre_cfg_bufs  *bufs;
2938         struct lustre_cfg       *lcfg;
2939         struct mdt_thread_info  *info;
2940         ENTRY;
2941
2942         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
2943         LASSERT(info != NULL);
2944
2945         bufs = &info->mti_u.bufs;
2946         /* process cleanup */
2947         lustre_cfg_bufs_reset(bufs, NULL);
2948         lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
2949         if (!lcfg) {
2950                 CERROR("Cannot alloc lcfg!\n");
2951                 return;
2952         }
2953         LASSERT(top);
2954         top->ld_ops->ldo_process_config(env, top, lcfg);
2955         lustre_cfg_free(lcfg);
2956
2957         lu_site_purge(env, top->ld_site, ~0);
2958         while (d != NULL) {
2959                 struct obd_type *type;
2960                 struct lu_device_type *ldt = d->ld_type;
2961
2962                 /* each fini() returns next device in stack of layers
2963                  * * so we can avoid the recursion */
2964                 n = ldt->ldt_ops->ldto_device_fini(env, d);
2965                 lu_device_put(d);
2966                 ldt->ldt_ops->ldto_device_free(env, d);
2967                 type = ldt->ldt_obd_type;
2968                 type->typ_refcnt--;
2969                 class_put_type(type);
2970
2971                 /* switch to the next device in the layer */
2972                 d = n;
2973         }
2974         m->mdt_child = NULL;
2975 }
2976
2977 static struct lu_device *mdt_layer_setup(const struct lu_env *env,
2978                                          const char *typename,
2979                                          struct lu_device *child,
2980                                          struct lustre_cfg *cfg)
2981 {
2982         struct obd_type       *type;
2983         struct lu_device_type *ldt;
2984         struct lu_device      *d;
2985         int rc;
2986         ENTRY;
2987
2988         /* find the type */
2989         type = class_get_type(typename);
2990         if (!type) {
2991                 CERROR("Unknown type: '%s'\n", typename);
2992                 GOTO(out, rc = -ENODEV);
2993         }
2994
2995         rc = lu_context_refill(&env->le_ctx);
2996         if (rc != 0) {
2997                 CERROR("Failure to refill context: '%d'\n", rc);
2998                 GOTO(out_type, rc);
2999         }
3000
3001         if (env->le_ses != NULL) {
3002                 rc = lu_context_refill(env->le_ses);
3003                 if (rc != 0) {
3004                         CERROR("Failure to refill session: '%d'\n", rc);
3005                         GOTO(out_type, rc);
3006                 }
3007         }
3008
3009         ldt = type->typ_lu;
3010         if (ldt == NULL) {
3011                 CERROR("type: '%s'\n", typename);
3012                 GOTO(out_type, rc = -EINVAL);
3013         }
3014
3015         ldt->ldt_obd_type = type;
3016         d = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
3017         if (IS_ERR(d)) {
3018                 CERROR("Cannot allocate device: '%s'\n", typename);
3019                 GOTO(out_type, rc = -ENODEV);
3020         }
3021
3022         LASSERT(child->ld_site);
3023         d->ld_site = child->ld_site;
3024
3025         type->typ_refcnt++;
3026         rc = ldt->ldt_ops->ldto_device_init(env, d, child);
3027         if (rc) {
3028                 CERROR("can't init device '%s', rc %d\n", typename, rc);
3029                 GOTO(out_alloc, rc);
3030         }
3031         lu_device_get(d);
3032
3033         RETURN(d);
3034
3035 out_alloc:
3036         ldt->ldt_ops->ldto_device_free(env, d);
3037         type->typ_refcnt--;
3038 out_type:
3039         class_put_type(type);
3040 out:
3041         return ERR_PTR(rc);
3042 }
3043
3044 static int mdt_stack_init(const struct lu_env *env,
3045                           struct mdt_device *m, struct lustre_cfg *cfg)
3046 {
3047         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
3048         struct lu_device  *tmp;
3049         struct md_device  *md;
3050         int rc;
3051         ENTRY;
3052
3053         /* init the stack */
3054         tmp = mdt_layer_setup(env, LUSTRE_OSD_NAME, d, cfg);
3055         if (IS_ERR(tmp)) {
3056                 RETURN(PTR_ERR(tmp));
3057         }
3058         m->mdt_bottom = lu2dt_dev(tmp);
3059         d = tmp;
3060         tmp = mdt_layer_setup(env, LUSTRE_MDD_NAME, d, cfg);
3061         if (IS_ERR(tmp)) {
3062                 GOTO(out, rc = PTR_ERR(tmp));
3063         }
3064         d = tmp;
3065         md = lu2md_dev(d);
3066
3067         tmp = mdt_layer_setup(env, LUSTRE_CMM_NAME, d, cfg);
3068         if (IS_ERR(tmp)) {
3069                 GOTO(out, rc = PTR_ERR(tmp));
3070         }
3071         d = tmp;
3072         /*set mdd upcall device*/
3073         md->md_upcall.mu_upcall_dev = lu2md_dev(d);
3074
3075         md = lu2md_dev(d);
3076         /*set cmm upcall device*/
3077         md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
3078
3079         m->mdt_child = lu2md_dev(d);
3080
3081         /* process setup config */
3082         tmp = &m->mdt_md_dev.md_lu_dev;
3083         rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
3084         GOTO(out, rc);
3085 out:
3086         /* fini from last known good lu_device */
3087         if (rc)
3088                 mdt_stack_fini(env, m, d);
3089
3090         return rc;
3091 }
3092
3093 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
3094 {
3095         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
3096         struct lu_site    *ls = d->ld_site;
3097
3098         ENTRY;
3099         target_cleanup_recovery(m->mdt_md_dev.md_lu_dev.ld_obd);
3100
3101         ping_evictor_stop();
3102         mdt_stop_ptlrpc_service(m);
3103
3104         upcall_cache_cleanup(m->mdt_rmtacl_cache);
3105         m->mdt_rmtacl_cache = NULL;
3106
3107         upcall_cache_cleanup(m->mdt_identity_cache);
3108         m->mdt_identity_cache = NULL;
3109
3110         if (m->mdt_namespace != NULL) {
3111                 ldlm_namespace_free(m->mdt_namespace, 0);
3112                 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
3113         }
3114
3115         mdt_seq_fini(env, m);
3116         mdt_seq_fini_cli(m);
3117         mdt_fld_fini(env, m);
3118
3119         if (m->mdt_rootsquash_info) {
3120                 OBD_FREE_PTR(m->mdt_rootsquash_info);
3121                 m->mdt_rootsquash_info = NULL;
3122         }
3123
3124         cleanup_capas(CAPA_SITE_SERVER);
3125         del_timer(&m->mdt_ck_timer);
3126         mdt_ck_thread_stop(m);
3127
3128         mdt_fs_cleanup(env, m);
3129
3130         /* finish the stack */
3131         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3132
3133         if (ls) {
3134                 lu_site_fini(ls);
3135                 OBD_FREE_PTR(ls);
3136                 d->ld_site = NULL;
3137         }
3138         LASSERT(atomic_read(&d->ld_ref) == 0);
3139         md_device_fini(&m->mdt_md_dev);
3140
3141         EXIT;
3142 }
3143
3144 int mdt_postrecov(const struct lu_env *, struct mdt_device *);
3145
3146 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
3147                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
3148 {
3149         struct lprocfs_static_vars lvars;
3150         struct mdt_thread_info    *info;
3151         struct obd_device         *obd;
3152         const char                *dev = lustre_cfg_string(cfg, 0);
3153         const char                *num = lustre_cfg_string(cfg, 2);
3154         struct lu_site            *s;
3155         int                        rc;
3156         ENTRY;
3157
3158         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3159         LASSERT(info != NULL);
3160
3161         obd = class_name2obd(dev);
3162         LASSERT(obd);
3163
3164         spin_lock_init(&m->mdt_transno_lock);
3165
3166         m->mdt_max_mdsize = MAX_MD_SIZE;
3167         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
3168
3169         spin_lock_init(&m->mdt_ioepoch_lock);
3170         /* Temporary. should parse mount option. */
3171         m->mdt_opts.mo_user_xattr = 0;
3172         m->mdt_opts.mo_acl = 0;
3173         m->mdt_opts.mo_compat_resname = 0;
3174         m->mdt_opts.mo_mds_capa = 0;
3175         m->mdt_opts.mo_oss_capa = 0;
3176         m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
3177         m->mdt_capa_timeout = CAPA_TIMEOUT;
3178         m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
3179         obd->obd_replayable = 1;
3180         spin_lock_init(&m->mdt_client_bitmap_lock);
3181
3182         OBD_ALLOC_PTR(s);
3183         if (s == NULL)
3184                 RETURN(-ENOMEM);
3185
3186         md_device_init(&m->mdt_md_dev, ldt);
3187         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
3188         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
3189         /* set this lu_device to obd, because error handling need it */
3190         obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
3191
3192         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
3193         if (rc) {
3194                 CERROR("can't init lu_site, rc %d\n", rc);
3195                 GOTO(err_free_site, rc);
3196         }
3197
3198         lprocfs_init_vars(mdt, &lvars);
3199         rc = lprocfs_obd_setup(obd, lvars.obd_vars);
3200         if (rc) {
3201                 CERROR("can't init lprocfs, rc %d\n", rc);
3202                 GOTO(err_fini_site, rc);
3203         }
3204
3205         /* init the stack */
3206         rc = mdt_stack_init(env, m, cfg);
3207         if (rc) {
3208                 CERROR("can't init device stack, rc %d\n", rc);
3209                 GOTO(err_fini_site, rc);
3210         }
3211
3212         /* set server index */
3213         LASSERT(num);
3214         s->ls_node_id = simple_strtol(num, NULL, 10);
3215
3216         rc = mdt_fld_init(env, obd->obd_name, m);
3217         if (rc)
3218                 GOTO(err_fini_stack, rc);
3219
3220         rc = mdt_seq_init(env, obd->obd_name, m);
3221         if (rc)
3222                 GOTO(err_fini_fld, rc);
3223
3224         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
3225                  LUSTRE_MDT_NAME"-%p", m);
3226         m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
3227                                               LDLM_NAMESPACE_SERVER);
3228         if (m->mdt_namespace == NULL)
3229                 GOTO(err_fini_seq, rc = -ENOMEM);
3230
3231         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
3232         /* set obd_namespace for compatibility with old code */
3233         obd->obd_namespace = m->mdt_namespace;
3234
3235         m->mdt_identity_cache = upcall_cache_init(obd->obd_name,
3236                                                   MDT_IDENTITY_UPCALL_PATH,
3237                                                   &mdt_identity_upcall_cache_ops);
3238         if (IS_ERR(m->mdt_identity_cache)) {
3239                 rc = PTR_ERR(m->mdt_identity_cache);
3240                 m->mdt_identity_cache = NULL;
3241                 GOTO(err_free_ns, rc);
3242         }
3243
3244         m->mdt_rmtacl_cache = upcall_cache_init(obd->obd_name,
3245                                                 MDT_RMTACL_UPCALL_PATH,
3246                                                 &mdt_rmtacl_upcall_cache_ops);
3247         if (IS_ERR(m->mdt_rmtacl_cache)) {
3248                 rc = PTR_ERR(m->mdt_rmtacl_cache);
3249                 m->mdt_rmtacl_cache = NULL;
3250                 GOTO(err_free_ns, rc);
3251         }
3252
3253         rc = mdt_ck_thread_start(m);
3254         if (rc)
3255                 GOTO(err_free_ns, rc);
3256         m->mdt_ck_timer.function = mdt_ck_timer_callback;
3257         m->mdt_ck_timer.data = (unsigned long)m;
3258         init_timer(&m->mdt_ck_timer);
3259
3260         s->ls_capa_keys = m->mdt_capa_keys;
3261         s->ls_capa_timeout = m->mdt_capa_timeout;
3262         s->ls_capa_alg = m->mdt_capa_alg;
3263
3264         rc = mdt_start_ptlrpc_service(m);
3265         if (rc)
3266                 GOTO(err_capa, rc);
3267
3268         ping_evictor_start();
3269         rc = mdt_fs_setup(env, m, obd);
3270         if (rc)
3271                 GOTO(err_stop_service, rc);
3272
3273         if(obd->obd_recovering == 0)
3274                 mdt_postrecov(env, m);
3275
3276         m->mdt_opts.mo_no_gss_support = 1;
3277
3278         RETURN(0);
3279
3280 err_stop_service:
3281         mdt_stop_ptlrpc_service(m);
3282 err_capa:
3283         del_timer(&m->mdt_ck_timer);
3284         mdt_ck_thread_stop(m);
3285 err_free_ns:
3286         upcall_cache_cleanup(m->mdt_rmtacl_cache);
3287         m->mdt_rmtacl_cache = NULL;
3288         upcall_cache_cleanup(m->mdt_identity_cache);
3289         m->mdt_identity_cache = NULL;
3290         ldlm_namespace_free(m->mdt_namespace, 0);
3291         obd->obd_namespace = m->mdt_namespace = NULL;
3292 err_fini_seq:
3293         mdt_seq_fini(env, m);
3294 err_fini_fld:
3295         mdt_fld_fini(env, m);
3296 err_fini_stack:
3297         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3298 err_fini_site:
3299         lu_site_fini(s);
3300 err_free_site:
3301         OBD_FREE_PTR(s);
3302
3303         md_device_fini(&m->mdt_md_dev);
3304         return (rc);
3305 }
3306
3307 /* FIXME: this macro is copied from lnet/libcfs/nidstring.c */
3308 #define LNET_NIDSTR_SIZE   32      /* size of each one (see below for usage) */
3309 static void do_process_nosquash_nids(struct mdt_device *m, char *buf)
3310 {
3311         struct rootsquash_info *rsi = m->mdt_rootsquash_info;
3312         char str[LNET_NIDSTR_SIZE], *end;
3313         lnet_nid_t nid;
3314
3315         LASSERT(rsi);
3316         rsi->rsi_n_nosquash_nids = 0;
3317         while (rsi->rsi_n_nosquash_nids < N_NOSQUASH_NIDS) {
3318                 end = strchr(buf, ',');
3319                 memset(str, 0, sizeof(str));
3320                 if (end)
3321                         strncpy(str, buf, min_t(int, sizeof(str), end - buf));
3322                 else
3323                         strncpy(str, buf, min_t(int, sizeof(str), strlen(buf)));
3324
3325                 if (!strcmp(str, "*")) {
3326                         nid = LNET_NID_ANY;
3327                 } else {
3328                         nid = libcfs_str2nid(str);
3329                         if (nid == LNET_NID_ANY)
3330                                 goto ignore;
3331                 }
3332                 rsi->rsi_nosquash_nids[rsi->rsi_n_nosquash_nids++] = nid;
3333 ignore:
3334                 if (!end || (*(end + 1) == 0))
3335                         return;
3336                 buf = end + 1;
3337         }
3338 }
3339
3340 /* used by MGS to process specific configurations */
3341 static int mdt_process_config(const struct lu_env *env,
3342                               struct lu_device *d, struct lustre_cfg *cfg)
3343 {
3344         struct mdt_device *m = mdt_dev(d);
3345         struct md_device *md_next  = m->mdt_child;
3346         struct lu_device *next = md2lu_dev(md_next);
3347         int rc = 0;
3348         ENTRY;
3349
3350         switch (cfg->lcfg_command) {
3351         case LCFG_PARAM: {
3352                 int i;
3353
3354                 for (i = 1; i < cfg->lcfg_bufcount; i++) {
3355                         char *key, *val;
3356
3357                         key = lustre_cfg_buf(cfg, i);
3358                         val = strchr(key, '=');
3359                         if (!val || (*(val + 1) == 0)) {
3360                                 CERROR("Can't parse param %s\n", key);
3361                                 rc = -EINVAL;
3362                                 /* continue parsing other params */
3363                                 continue;
3364                         }
3365
3366                         val++;
3367                         if (class_match_param(key,
3368                                               PARAM_GSS_SUPPORT, 0) == 0) {
3369                                 if (memcmp(val, "no", 2) == 0) {
3370                                         m->mdt_opts.mo_no_gss_support = 1;
3371                                 } else if (memcmp(val, "yes", 3) == 0) {
3372                                         m->mdt_opts.mo_no_gss_support = 0;
3373                                 } else {
3374                                         CERROR("Can't parse param %s\n", key);
3375                                         rc = -EINVAL;
3376                                         /* continue parsing other params */
3377                                         continue;
3378                                 }
3379                         } else if (class_match_param(key,
3380                                         PARAM_ROOTSQUASH_UID, 0) == 0) {
3381                                 if (!m->mdt_rootsquash_info)
3382                                         OBD_ALLOC_PTR(m->mdt_rootsquash_info);
3383                                 if (!m->mdt_rootsquash_info)
3384                                         RETURN(-ENOMEM);
3385
3386                                 m->mdt_rootsquash_info->rsi_uid =
3387                                         simple_strtoul(val, NULL, 0);
3388                         } else if (class_match_param(key,
3389                                         PARAM_ROOTSQUASH_GID, 0) == 0) {
3390                                 if (!m->mdt_rootsquash_info)
3391                                         OBD_ALLOC_PTR(m->mdt_rootsquash_info);
3392                                 if (!m->mdt_rootsquash_info)
3393                                         RETURN(-ENOMEM);
3394
3395                                 m->mdt_rootsquash_info->rsi_gid =
3396                                         simple_strtoul(val, NULL, 0);
3397                         } else if (class_match_param(key,
3398                                         PARAM_ROOTSQUASH_SKIPS, 0) == 0) {
3399                                 if (!m->mdt_rootsquash_info)
3400                                         OBD_ALLOC_PTR(m->mdt_rootsquash_info);
3401                                 if (!m->mdt_rootsquash_info)
3402                                         RETURN(-ENOMEM);
3403
3404                                 do_process_nosquash_nids(m, val);
3405                         } else {
3406                                 rc = -EINVAL;
3407                         }
3408                 }
3409
3410                 if (rc)
3411                         /* others are passed further */
3412                         rc = next->ld_ops->ldo_process_config(env, next, cfg);
3413                 break;
3414         }
3415         case LCFG_ADD_MDC:
3416                 /*
3417                  * Add mdc hook to get first MDT uuid and connect it to
3418                  * ls->controller to use for seq manager.
3419                  */
3420                 rc = mdt_seq_init_cli(env, mdt_dev(d), cfg);
3421                 if (rc) {
3422                         CERROR("can't initialize controller export, "
3423                                "rc %d\n", rc);
3424                 }
3425         default:
3426                 /* others are passed further */
3427                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
3428                 break;
3429         }
3430         RETURN(rc);
3431 }
3432
3433 static struct lu_object *mdt_object_alloc(const struct lu_env *env,
3434                                           const struct lu_object_header *hdr,
3435                                           struct lu_device *d)
3436 {
3437         struct mdt_object *mo;
3438
3439         ENTRY;
3440
3441         OBD_ALLOC_PTR(mo);
3442         if (mo != NULL) {
3443                 struct lu_object *o;
3444                 struct lu_object_header *h;
3445
3446                 o = &mo->mot_obj.mo_lu;
3447                 h = &mo->mot_header;
3448                 lu_object_header_init(h);
3449                 lu_object_init(o, h, d);
3450                 lu_object_add_top(h, o);
3451                 o->lo_ops = &mdt_obj_ops;
3452                 RETURN(o);
3453         } else
3454                 RETURN(NULL);
3455 }
3456
3457 static int mdt_object_init(const struct lu_env *env, struct lu_object *o)
3458 {
3459         struct mdt_device *d = mdt_dev(o->lo_dev);
3460         struct lu_device  *under;
3461         struct lu_object  *below;
3462         int                rc = 0;
3463         ENTRY;
3464
3465         CDEBUG(D_INFO, "object init, fid = "DFID"\n",
3466                PFID(lu_object_fid(o)));
3467
3468         under = &d->mdt_child->md_lu_dev;
3469         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
3470         if (below != NULL) {
3471                 lu_object_add(o, below);
3472         } else
3473                 rc = -ENOMEM;
3474         RETURN(rc);
3475 }
3476
3477 static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
3478 {
3479         struct mdt_object *mo = mdt_obj(o);
3480         struct lu_object_header *h;
3481         ENTRY;
3482
3483         h = o->lo_header;
3484         CDEBUG(D_INFO, "object free, fid = "DFID"\n",
3485                PFID(lu_object_fid(o)));
3486
3487         lu_object_fini(o);
3488         lu_object_header_fini(h);
3489         OBD_FREE_PTR(mo);
3490         EXIT;
3491 }
3492
3493 static int mdt_object_print(const struct lu_env *env, void *cookie,
3494                             lu_printer_t p, const struct lu_object *o)
3495 {
3496         return (*p)(env, cookie, LUSTRE_MDT_NAME"-object@%p", o);
3497 }
3498
3499 static struct lu_device_operations mdt_lu_ops = {
3500         .ldo_object_alloc   = mdt_object_alloc,
3501         .ldo_process_config = mdt_process_config
3502 };
3503
3504 static struct lu_object_operations mdt_obj_ops = {
3505         .loo_object_init    = mdt_object_init,
3506         .loo_object_free    = mdt_object_free,
3507         .loo_object_print   = mdt_object_print
3508 };
3509
3510 /* mds_connect_internal */
3511 static int mdt_connect_internal(struct obd_export *exp,
3512                                 struct mdt_device *mdt,
3513                                 struct obd_connect_data *data)
3514 {
3515         __u64 flags;
3516
3517         if (data != NULL) {
3518                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
3519                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
3520
3521                 /* If no known bits (which should not happen, probably,
3522                    as everybody should support LOOKUP and UPDATE bits at least)
3523                    revert to compat mode with plain locks. */
3524                 if (!data->ocd_ibits_known &&
3525                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
3526                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
3527
3528                 if (!mdt->mdt_opts.mo_acl)
3529                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
3530
3531                 if (!mdt->mdt_opts.mo_user_xattr)
3532                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
3533
3534                 if (!mdt->mdt_opts.mo_mds_capa)
3535                         data->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
3536
3537                 if (!mdt->mdt_opts.mo_oss_capa)
3538                         data->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
3539
3540                 exp->exp_connect_flags = data->ocd_connect_flags;
3541                 data->ocd_version = LUSTRE_VERSION_CODE;
3542                 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
3543         }
3544
3545         if (mdt->mdt_opts.mo_acl &&
3546             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
3547                 CWARN("%s: MDS requires ACL support but client does not\n",
3548                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3549                 return -EBADE;
3550         }
3551
3552         flags = OBD_CONNECT_LCL_CLIENT | OBD_CONNECT_RMT_CLIENT;
3553         if ((exp->exp_connect_flags & flags) == flags) {
3554                 CWARN("%s: both local and remote client flags are set\n",
3555                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3556                 return -EBADE;
3557         }
3558
3559         if (mdt->mdt_opts.mo_mds_capa &&
3560             ((exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) == 0)) {
3561                 CWARN("%s: MDS requires capability support, but client not\n",
3562                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3563                 return -EBADE;
3564         }
3565
3566         if (mdt->mdt_opts.mo_oss_capa &&
3567             ((exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA) == 0)) {
3568                 CWARN("%s: MDS requires OSS capability support, "
3569                       "but client not\n",
3570                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3571                 return -EBADE;
3572         }
3573
3574         return 0;
3575 }
3576
3577 /* mds_connect copy */
3578 static int mdt_obd_connect(const struct lu_env *env,
3579                            struct lustre_handle *conn, struct obd_device *obd,
3580                            struct obd_uuid *cluuid,
3581                            struct obd_connect_data *data)
3582 {
3583         struct mdt_export_data *med;
3584         struct mdt_client_data *mcd;
3585         struct obd_export      *exp;
3586         struct mdt_device      *mdt;
3587         int                     rc;
3588         ENTRY;
3589
3590         LASSERT(env != NULL);
3591         if (!conn || !obd || !cluuid)
3592                 RETURN(-EINVAL);
3593
3594         mdt = mdt_dev(obd->obd_lu_dev);
3595
3596         rc = class_connect(conn, obd, cluuid);
3597         if (rc)
3598                 RETURN(rc);
3599
3600         exp = class_conn2export(conn);
3601         LASSERT(exp != NULL);
3602         med = &exp->exp_mdt_data;
3603
3604         rc = mdt_connect_internal(exp, mdt, data);
3605         if (rc == 0) {
3606                 OBD_ALLOC_PTR(mcd);
3607                 if (mcd != NULL) {
3608                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
3609                         med->med_mcd = mcd;
3610                         rc = mdt_client_new(env, mdt, med);
3611                         if (rc != 0) {
3612                                 OBD_FREE_PTR(mcd);
3613                                 med->med_mcd = NULL;
3614                         }
3615                 } else
3616                         rc = -ENOMEM;
3617         }
3618
3619         if (rc != 0)
3620                 class_disconnect(exp);
3621         else
3622                 class_export_put(exp);
3623
3624         RETURN(rc);
3625 }
3626
3627 static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
3628                              struct obd_uuid *cluuid,
3629                              struct obd_connect_data *data)
3630 {
3631         int rc;
3632         ENTRY;
3633
3634         if (exp == NULL || obd == NULL || cluuid == NULL)
3635                 RETURN(-EINVAL);
3636
3637         rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
3638
3639         RETURN(rc);
3640 }
3641
3642 static int mdt_obd_disconnect(struct obd_export *exp)
3643 {
3644         struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
3645         int rc;
3646         ENTRY;
3647
3648         LASSERT(exp);
3649         class_export_get(exp);
3650
3651         /* Disconnect early so that clients can't keep using export */
3652         rc = class_disconnect(exp);
3653         if (mdt->mdt_namespace != NULL || exp->exp_obd->obd_namespace != NULL)
3654                 ldlm_cancel_locks_for_export(exp);
3655
3656         /* complete all outstanding replies */
3657         spin_lock(&exp->exp_lock);
3658         while (!list_empty(&exp->exp_outstanding_replies)) {
3659                 struct ptlrpc_reply_state *rs =
3660                         list_entry(exp->exp_outstanding_replies.next,
3661                                    struct ptlrpc_reply_state, rs_exp_list);
3662                 struct ptlrpc_service *svc = rs->rs_service;
3663
3664                 spin_lock(&svc->srv_lock);
3665                 list_del_init(&rs->rs_exp_list);
3666                 ptlrpc_schedule_difficult_reply(rs);
3667                 spin_unlock(&svc->srv_lock);
3668         }
3669         spin_unlock(&exp->exp_lock);
3670
3671         class_export_put(exp);
3672         RETURN(rc);
3673 }
3674
3675 /* FIXME: Can we avoid using these two interfaces? */
3676 static int mdt_init_export(struct obd_export *exp)
3677 {
3678         struct mdt_export_data *med = &exp->exp_mdt_data;
3679         ENTRY;
3680
3681         INIT_LIST_HEAD(&med->med_open_head);
3682         spin_lock_init(&med->med_open_lock);
3683         exp->exp_connecting = 1;
3684         RETURN(0);
3685 }
3686
3687 static int mdt_destroy_export(struct obd_export *export)
3688 {
3689         struct mdt_export_data *med;
3690         struct obd_device      *obd = export->exp_obd;
3691         struct mdt_device      *mdt;
3692         struct mdt_thread_info *info;
3693         struct lu_env           env;
3694         struct md_attr         *ma;
3695         int rc = 0;
3696         ENTRY;
3697
3698         med = &export->exp_mdt_data;
3699         if (med->med_rmtclient)
3700                 mdt_cleanup_idmap(med);
3701
3702         target_destroy_export(export);
3703
3704         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
3705                 RETURN(0);
3706
3707         mdt = mdt_dev(obd->obd_lu_dev);
3708         LASSERT(mdt != NULL);
3709
3710         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
3711         if (rc)
3712                 RETURN(rc);
3713
3714         info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
3715         LASSERT(info != NULL);
3716         memset(info, 0, sizeof *info);
3717         info->mti_env = &env;
3718         info->mti_mdt = mdt;
3719
3720         ma = &info->mti_attr;
3721         ma->ma_lmm_size = mdt->mdt_max_mdsize;
3722         ma->ma_cookie_size = mdt->mdt_max_cookiesize;
3723         OBD_ALLOC(ma->ma_lmm, mdt->mdt_max_mdsize);
3724         OBD_ALLOC(ma->ma_cookie, mdt->mdt_max_cookiesize);
3725
3726         if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
3727                 GOTO(out, rc = -ENOMEM);
3728         ma->ma_need = MA_LOV | MA_COOKIE;
3729
3730         /* Close any open files (which may also cause orphan unlinking). */
3731         spin_lock(&med->med_open_lock);
3732         while (!list_empty(&med->med_open_head)) {
3733                 struct list_head *tmp = med->med_open_head.next;
3734                 struct mdt_file_data *mfd =
3735                         list_entry(tmp, struct mdt_file_data, mfd_list);
3736                 struct md_attr *ma = &info->mti_attr;
3737
3738                 /* Remove mfd handle so it can't be found again.
3739                  * We are consuming the mfd_list reference here. */
3740                 class_handle_unhash(&mfd->mfd_handle);
3741                 list_del_init(&mfd->mfd_list);
3742                 spin_unlock(&med->med_open_lock);
3743                 mdt_mfd_close(info, mfd);
3744                 /* TODO: if we close the unlinked file,
3745                  * we need to remove it's objects from OST */
3746                 memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
3747                 spin_lock(&med->med_open_lock);
3748         }
3749         spin_unlock(&med->med_open_lock);
3750         info->mti_mdt = NULL;
3751         mdt_client_del(&env, mdt, med);
3752
3753 out:
3754         if (ma->ma_lmm)
3755                 OBD_FREE(ma->ma_lmm, mdt->mdt_max_mdsize);
3756         if (ma->ma_cookie)
3757                 OBD_FREE(ma->ma_cookie, mdt->mdt_max_cookiesize);
3758         lu_env_fini(&env);
3759
3760         RETURN(rc);
3761 }
3762
3763 static int mdt_upcall(const struct lu_env *env, struct md_device *md,
3764                       enum md_upcall_event ev)
3765 {
3766         struct mdt_device *m = mdt_dev(&md->md_lu_dev);
3767         struct md_device  *next  = m->mdt_child;
3768         struct mdt_thread_info *mti;
3769         int rc = 0;
3770         ENTRY;
3771
3772         switch (ev) {
3773                 case MD_LOV_SYNC:
3774                         rc = next->md_ops->mdo_maxsize_get(env, next,
3775                                         &m->mdt_max_mdsize,
3776                                         &m->mdt_max_cookiesize);
3777                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
3778                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
3779                         break;
3780                 case MD_NO_TRANS:
3781                         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3782                         mti->mti_no_need_trans = 1;
3783                         CDEBUG(D_INFO, "disable mdt trans for this thread\n");
3784                         break;
3785                 default:
3786                         CERROR("invalid event\n");
3787                         rc = -EINVAL;
3788                         break;
3789         }
3790         RETURN(rc);
3791 }
3792
3793 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3794                          void *karg, void *uarg)
3795 {
3796         struct lu_env      env;
3797         struct obd_device *obd= exp->exp_obd;
3798         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
3799         struct dt_device  *dt = mdt->mdt_bottom;
3800         int rc;
3801
3802         ENTRY;
3803         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
3804         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
3805         if (rc)
3806                 RETURN(rc);
3807
3808         switch (cmd) {
3809         case OBD_IOC_SYNC:
3810                 rc = dt->dd_ops->dt_sync(&env, dt);
3811                 break;
3812
3813         case OBD_IOC_SET_READONLY:
3814                 rc = dt->dd_ops->dt_sync(&env, dt);
3815                 dt->dd_ops->dt_ro(&env, dt);
3816                 break;
3817
3818         case OBD_IOC_ABORT_RECOVERY:
3819                 CERROR("aborting recovery for device %s\n", obd->obd_name);
3820                 target_abort_recovery(obd);
3821                 break;
3822
3823         default:
3824                 CERROR("not supported cmd = %d for device %s\n",
3825                        cmd, obd->obd_name);
3826                 rc = -EOPNOTSUPP;
3827         }
3828
3829         lu_env_fini(&env);
3830         RETURN(rc);
3831 }
3832
3833 int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
3834 {
3835         struct lu_device *ld = md2lu_dev(mdt->mdt_child);
3836         int rc;
3837         ENTRY;
3838         rc = ld->ld_ops->ldo_recovery_complete(env, ld);
3839         RETURN(rc);
3840 }
3841
3842 int mdt_obd_postrecov(struct obd_device *obd)
3843 {
3844         struct lu_env env;
3845         int rc;
3846
3847         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
3848         if (rc)
3849                 RETURN(rc);
3850         rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev));
3851         lu_env_fini(&env);
3852         return rc;
3853 }
3854
3855 static struct obd_ops mdt_obd_device_ops = {
3856         .o_owner          = THIS_MODULE,
3857         .o_connect        = mdt_obd_connect,
3858         .o_reconnect      = mdt_obd_reconnect,
3859         .o_disconnect     = mdt_obd_disconnect,
3860         .o_init_export    = mdt_init_export,
3861         .o_destroy_export = mdt_destroy_export,
3862         .o_iocontrol      = mdt_iocontrol,
3863         .o_postrecov      = mdt_obd_postrecov
3864
3865 };
3866
3867 static struct lu_device* mdt_device_fini(const struct lu_env *env,
3868                                          struct lu_device *d)
3869 {
3870         struct mdt_device *m = mdt_dev(d);
3871
3872         mdt_fini(env, m);
3873         RETURN(NULL);
3874 }
3875
3876 static void mdt_device_free(const struct lu_env *env, struct lu_device *d)
3877 {
3878         struct mdt_device *m = mdt_dev(d);
3879
3880         OBD_FREE_PTR(m);
3881 }
3882
3883 static struct lu_device *mdt_device_alloc(const struct lu_env *env,
3884                                           struct lu_device_type *t,
3885                                           struct lustre_cfg *cfg)
3886 {
3887         struct lu_device  *l;
3888         struct mdt_device *m;
3889
3890         OBD_ALLOC_PTR(m);
3891         if (m != NULL) {
3892                 int rc;
3893
3894                 l = &m->mdt_md_dev.md_lu_dev;
3895                 rc = mdt_init0(env, m, t, cfg);
3896                 if (rc != 0) {
3897                         OBD_FREE_PTR(m);
3898                         l = ERR_PTR(rc);
3899                         return l;
3900                 }
3901                 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
3902         } else
3903                 l = ERR_PTR(-ENOMEM);
3904         return l;
3905 }
3906
3907 /*
3908  * context key constructor/destructor
3909  */
3910 static void *mdt_key_init(const struct lu_context *ctx,
3911                           struct lu_context_key *key)
3912 {
3913         struct mdt_thread_info *info;
3914
3915         /*
3916          * check that no high order allocations are incurred.
3917          */
3918         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
3919         OBD_ALLOC_PTR(info);
3920         if (info == NULL)
3921                 info = ERR_PTR(-ENOMEM);
3922         return info;
3923 }
3924
3925 static void mdt_key_fini(const struct lu_context *ctx,
3926                          struct lu_context_key *key, void *data)
3927 {
3928         struct mdt_thread_info *info = data;
3929         OBD_FREE_PTR(info);
3930 }
3931
3932 struct lu_context_key mdt_thread_key = {
3933         .lct_tags = LCT_MD_THREAD,
3934         .lct_init = mdt_key_init,
3935         .lct_fini = mdt_key_fini
3936 };
3937
3938 static void *mdt_txn_key_init(const struct lu_context *ctx,
3939                               struct lu_context_key *key)
3940 {
3941         struct mdt_txn_info *txi;
3942
3943         /*
3944          * check that no high order allocations are incurred.
3945          */
3946         CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
3947         OBD_ALLOC_PTR(txi);
3948         if (txi == NULL)
3949                 txi = ERR_PTR(-ENOMEM);
3950         return txi;
3951 }
3952
3953 static void mdt_txn_key_fini(const struct lu_context *ctx,
3954                              struct lu_context_key *key, void *data)
3955 {
3956         struct mdt_txn_info *txi = data;
3957         OBD_FREE_PTR(txi);
3958 }
3959
3960 struct lu_context_key mdt_txn_key = {
3961         .lct_tags = LCT_TX_HANDLE,
3962         .lct_init = mdt_txn_key_init,
3963         .lct_fini = mdt_txn_key_fini
3964 };
3965
3966 struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
3967 {
3968         return md_ucred(info->mti_env);
3969 }
3970
3971 static int mdt_type_init(struct lu_device_type *t)
3972 {
3973         int rc;
3974
3975         rc = lu_context_key_register(&mdt_thread_key);
3976         if (rc == 0)
3977                 rc = lu_context_key_register(&mdt_txn_key);
3978         return rc;
3979 }
3980
3981 static void mdt_type_fini(struct lu_device_type *t)
3982 {
3983         lu_context_key_degister(&mdt_thread_key);
3984         lu_context_key_degister(&mdt_txn_key);
3985 }
3986
3987 static struct lu_device_type_operations mdt_device_type_ops = {
3988         .ldto_init = mdt_type_init,
3989         .ldto_fini = mdt_type_fini,
3990
3991         .ldto_device_alloc = mdt_device_alloc,
3992         .ldto_device_free  = mdt_device_free,
3993         .ldto_device_fini  = mdt_device_fini
3994 };
3995
3996 static struct lu_device_type mdt_device_type = {
3997         .ldt_tags     = LU_DEVICE_MD,
3998         .ldt_name     = LUSTRE_MDT_NAME,
3999         .ldt_ops      = &mdt_device_type_ops,
4000         .ldt_ctx_tags = LCT_MD_THREAD
4001 };
4002
4003 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
4004         { "uuid",            lprocfs_rd_uuid,                0, 0 },
4005         { "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 },
4006         { "num_exports",     lprocfs_rd_num_exports,         0, 0 },
4007         { 0 }
4008 };
4009
4010 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
4011         { "num_refs",        lprocfs_rd_numrefs,             0, 0 },
4012         { 0 }
4013 };
4014
4015 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
4016
4017 static int __init mdt_mod_init(void)
4018 {
4019         struct lprocfs_static_vars lvars;
4020         int rc;
4021
4022         printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n");
4023
4024         mdt_num_threads = MDT_NUM_THREADS;
4025         lprocfs_init_vars(mdt, &lvars);
4026         rc = class_register_type(&mdt_obd_device_ops, NULL,
4027                                  lvars.module_vars, LUSTRE_MDT_NAME,
4028                                  &mdt_device_type);
4029
4030         return rc;
4031 }
4032
4033 static void __exit mdt_mod_exit(void)
4034 {
4035         class_unregister_type(LUSTRE_MDT_NAME);
4036 }
4037
4038
4039 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
4040 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
4041         .mh_name    = #opc,                                             \
4042         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
4043         .mh_opc     = prefix ## _  ## opc,                              \
4044         .mh_flags   = flags,                                            \
4045         .mh_act     = fn,                                               \
4046         .mh_fmt     = fmt                                               \
4047 }
4048
4049 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
4050         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
4051
4052 #define DEF_SEQ_HNDL(flags, name, fn, fmt)                      \
4053         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
4054
4055 #define DEF_FLD_HNDL(flags, name, fn, fmt)                      \
4056         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
4057 /*
4058  * Request with a format known in advance
4059  */
4060 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
4061         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
4062
4063 #define DEF_SEQ_HNDL_F(flags, name, fn)                                 \
4064         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
4065
4066 #define DEF_FLD_HNDL_F(flags, name, fn)                                 \
4067         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_FLD_ ## name)
4068 /*
4069  * Request with a format we do not yet know
4070  */
4071 #define DEF_MDT_HNDL_0(flags, name, fn)                                 \
4072         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
4073
4074 static struct mdt_handler mdt_mds_ops[] = {
4075 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
4076 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
4077 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
4078 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR,      mdt_getattr),
4079 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
4080 DEF_MDT_HNDL_F(HABEO_CORPUS|MUTABOR,      SETXATTR,     mdt_setxattr),
4081 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
4082 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
4083 DEF_MDT_HNDL_F(0                        |MUTABOR,
4084                                           REINT,        mdt_reint),
4085 DEF_MDT_HNDL_F(HABEO_CORPUS             , CLOSE,        mdt_close),
4086 DEF_MDT_HNDL_F(HABEO_CORPUS             , DONE_WRITING, mdt_done_writing),
4087 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
4088 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
4089 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR,    mdt_is_subdir),
4090 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_quotacheck_handle),
4091 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle),
4092 DEF_MDT_HNDL_0(0           |HABEO_REFERO, RENEW_CAPA,   mdt_renew_capa)
4093 };
4094
4095 #define DEF_OBD_HNDL(flags, name, fn)                   \
4096         DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
4097
4098
4099 static struct mdt_handler mdt_obd_ops[] = {
4100         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
4101         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
4102         DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
4103 };
4104
4105 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
4106         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
4107 #define DEF_DLM_HNDL_F(flags, name, fn)                   \
4108         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
4109
4110 static struct mdt_handler mdt_dlm_ops[] = {
4111         DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
4112         DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
4113         DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
4114         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
4115 };
4116
4117 static struct mdt_handler mdt_llog_ops[] = {
4118 };
4119
4120 #define DEF_SEC_CTX_HNDL(name, fn)                      \
4121         DEF_HNDL(SEC_CTX, INIT, _NET, 0, name, fn, NULL)
4122
4123 static struct mdt_handler mdt_sec_ctx_ops[] = {
4124         DEF_SEC_CTX_HNDL(INIT,          mdt_sec_ctx_handle),
4125         DEF_SEC_CTX_HNDL(INIT_CONT,     mdt_sec_ctx_handle),
4126         DEF_SEC_CTX_HNDL(FINI,          mdt_sec_ctx_handle)
4127 };
4128
4129 static struct mdt_opc_slice mdt_regular_handlers[] = {
4130         {
4131                 .mos_opc_start = MDS_GETATTR,
4132                 .mos_opc_end   = MDS_LAST_OPC,
4133                 .mos_hs        = mdt_mds_ops
4134         },
4135         {
4136                 .mos_opc_start = OBD_PING,
4137                 .mos_opc_end   = OBD_LAST_OPC,
4138                 .mos_hs        = mdt_obd_ops
4139         },
4140         {
4141                 .mos_opc_start = LDLM_ENQUEUE,
4142                 .mos_opc_end   = LDLM_LAST_OPC,
4143                 .mos_hs        = mdt_dlm_ops
4144         },
4145         {
4146                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
4147                 .mos_opc_end   = LLOG_LAST_OPC,
4148                 .mos_hs        = mdt_llog_ops
4149         },
4150         {
4151                 .mos_opc_start = SEC_CTX_INIT,
4152                 .mos_opc_end   = SEC_LAST_OPC,
4153                 .mos_hs        = mdt_sec_ctx_ops
4154         },
4155         {
4156                 .mos_hs        = NULL
4157         }
4158 };
4159
4160 static struct mdt_handler mdt_readpage_ops[] = {
4161         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
4162 #ifdef HAVE_SPLIT_SUPPORT
4163         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
4164 #endif
4165
4166         /*
4167          * XXX: this is ugly and should be fixed one day, see mdc_close() for
4168          * detailed comments. --umka
4169          */
4170         DEF_MDT_HNDL_F(HABEO_CORPUS,              CLOSE,    mdt_close),
4171         DEF_MDT_HNDL_F(HABEO_CORPUS,              DONE_WRITING,    mdt_done_writing),
4172 };
4173
4174 static struct mdt_opc_slice mdt_readpage_handlers[] = {
4175         {
4176                 .mos_opc_start = MDS_GETATTR,
4177                 .mos_opc_end   = MDS_LAST_OPC,
4178                 .mos_hs        = mdt_readpage_ops
4179         },
4180         {
4181                 .mos_hs        = NULL
4182         }
4183 };
4184
4185 static struct mdt_handler mdt_seq_ops[] = {
4186         DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
4187 };
4188
4189 static struct mdt_opc_slice mdt_seq_handlers[] = {
4190         {
4191                 .mos_opc_start = SEQ_QUERY,
4192                 .mos_opc_end   = SEQ_LAST_OPC,
4193                 .mos_hs        = mdt_seq_ops
4194         },
4195         {
4196                 .mos_hs        = NULL
4197         }
4198 };
4199
4200 static struct mdt_handler mdt_fld_ops[] = {
4201         DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
4202 };
4203
4204 static struct mdt_opc_slice mdt_fld_handlers[] = {
4205         {
4206                 .mos_opc_start = FLD_QUERY,
4207                 .mos_opc_end   = FLD_LAST_OPC,
4208                 .mos_hs        = mdt_fld_ops
4209         },
4210         {
4211                 .mos_hs        = NULL
4212         }
4213 };
4214
4215 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4216 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
4217 MODULE_LICENSE("GPL");
4218
4219 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
4220                 "number of mdt service threads to start");
4221
4222 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);