Whamcloud - gitweb
b3648819f9f534990987a44b810b2b3e92de6793
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 #include <lustre_fid.h>
55 #include <md_object.h>
56 #include <lprocfs_status.h>
57 #include <lustre_param.h>
58 #include "mdc_internal.h"
59 #include <lustre/lustre_idl.h>
60
61 #define REQUEST_MINOR 244
62
63 static quota_interface_t *quota_interface;
64 extern quota_interface_t mdc_quota_interface;
65
66 static int mdc_cleanup(struct obd_device *obd);
67
68 int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
69                     const struct req_msg_field *field, struct obd_capa **oc)
70 {
71         struct lustre_capa *capa;
72         struct obd_capa *c;
73         ENTRY;
74
75         /* swabbed already in mdc_enqueue */
76         capa = req_capsule_server_get(&req->rq_pill, field);
77         if (capa == NULL)
78                 RETURN(-EPROTO);
79
80         c = alloc_capa(CAPA_SITE_CLIENT);
81         if (IS_ERR(c)) {
82                 CDEBUG(D_INFO, "alloc capa failed!\n");
83                 RETURN(PTR_ERR(c));
84         } else {
85                 c->c_capa = *capa;
86                 *oc = c;
87                 RETURN(0);
88         }
89 }
90
91 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
92 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
93 static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
94                           struct obd_capa **pc, int level, int msg_flags)
95 {
96         struct ptlrpc_request *req;
97         struct mdt_body       *body;
98         int                    rc;
99         ENTRY;
100
101         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_GETSTATUS,
102                                         LUSTRE_MDS_VERSION, MDS_GETSTATUS);
103         if (req == NULL)
104                 RETURN(-ENOMEM);
105
106         mdc_pack_body(req, NULL, NULL, 0, 0, -1, 0);
107         lustre_msg_add_flags(req->rq_reqmsg, msg_flags);
108         req->rq_send_state = level;
109
110         ptlrpc_request_set_replen(req);
111
112         rc = ptlrpc_queue_wait(req);
113         if (rc)
114                 GOTO(out, rc);
115
116         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
117         if (body == NULL)
118                 GOTO(out, rc = -EPROTO);
119
120         if (body->valid & OBD_MD_FLMDSCAPA) {
121                 rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, pc);
122                 if (rc)
123                         GOTO(out, rc);
124         }
125
126         *rootfid = body->fid1;
127         CDEBUG(D_NET,
128                "root fid="DFID", last_committed="LPU64"\n",
129                PFID(rootfid),
130                lustre_msg_get_last_committed(req->rq_repmsg));
131         EXIT;
132 out:
133         ptlrpc_req_finished(req);
134         return rc;
135 }
136
137 /* This should be mdc_get_info("rootfid") */
138 int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
139                   struct obd_capa **pc)
140 {
141         return send_getstatus(class_exp2cliimp(exp), rootfid, pc,
142                               LUSTRE_IMP_FULL, 0);
143 }
144
145 /*
146  * This function now is known to always saying that it will receive 4 buffers
147  * from server. Even for cases when acl_size and md_size is zero, RPC header
148  * will contain 4 fields and RPC itself will contain zero size fields. This is
149  * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed
150  * and thus zero, it shrinks it, making zero size. The same story about
151  * md_size. And this is course of problem when client waits for smaller number
152  * of fields. This issue will be fixed later when client gets aware of RPC
153  * layouts.  --umka
154  */
155 static int mdc_getattr_common(struct obd_export *exp,
156                               struct ptlrpc_request *req)
157 {
158         struct req_capsule *pill = &req->rq_pill;
159         struct mdt_body    *body;
160         void               *eadata;
161         int                 rc;
162         ENTRY;
163
164         /* Request message already built. */
165         rc = ptlrpc_queue_wait(req);
166         if (rc != 0)
167                 RETURN(rc);
168
169         /* sanity check for the reply */
170         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
171         if (body == NULL)
172                 RETURN(-EPROTO);
173
174         CDEBUG(D_NET, "mode: %o\n", body->mode);
175
176         if (body->eadatasize != 0) {
177                 mdc_update_max_ea_from_body(exp, body);
178
179                 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
180                                                       body->eadatasize);
181                 if (eadata == NULL)
182                         RETURN(-EPROTO);
183         }
184
185         if (body->valid & OBD_MD_FLRMTPERM) {
186                 struct mdt_remote_perm *perm;
187
188                 LASSERT(client_is_remote(exp));
189                 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
190                                                 lustre_swab_mdt_remote_perm);
191                 if (perm == NULL)
192                         RETURN(-EPROTO);
193         }
194
195         if (body->valid & OBD_MD_FLMDSCAPA) {
196                 struct lustre_capa *capa;
197                 capa = req_capsule_server_get(pill, &RMF_CAPA1);
198                 if (capa == NULL)
199                         RETURN(-EPROTO);
200         }
201
202         RETURN(0);
203 }
204
205 int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid,
206                 struct obd_capa *oc, obd_valid valid, int ea_size,
207                 struct ptlrpc_request **request)
208 {
209         struct ptlrpc_request *req;
210         int                    rc;
211         ENTRY;
212
213         *request = NULL;
214         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
215         if (req == NULL)
216                 RETURN(-ENOMEM);
217
218         mdc_set_capa_size(req, &RMF_CAPA1, oc);
219
220         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
221         if (rc) {
222                 ptlrpc_request_free(req);
223                 RETURN(rc);
224         }
225
226         mdc_pack_body(req, fid, oc, valid, ea_size, -1, 0);
227
228         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, ea_size);
229         if (valid & OBD_MD_FLRMTPERM) {
230                 LASSERT(client_is_remote(exp));
231                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
232                                      sizeof(struct mdt_remote_perm));
233         }
234         ptlrpc_request_set_replen(req);
235
236         rc = mdc_getattr_common(exp, req);
237         if (rc)
238                 ptlrpc_req_finished(req);
239         else
240                 *request = req;
241         RETURN(rc);
242 }
243
244 int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
245                      struct obd_capa *oc, const char *filename, int namelen,
246                      obd_valid valid, int ea_size, __u32 suppgid,
247                      struct ptlrpc_request **request)
248 {
249         struct ptlrpc_request *req;
250         int                    rc;
251         ENTRY;
252
253         *request = NULL;
254         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
255                                    &RQF_MDS_GETATTR_NAME);
256         if (req == NULL)
257                 RETURN(-ENOMEM);
258
259         mdc_set_capa_size(req, &RMF_CAPA1, oc);
260         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, namelen);
261
262         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME);
263         if (rc) {
264                 ptlrpc_request_free(req);
265                 RETURN(rc);
266         }
267
268         mdc_pack_body(req, fid, oc, valid, ea_size, suppgid, 0);
269
270         if (filename) {
271                 char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
272                 LASSERT(strnlen(filename, namelen) == namelen - 1);
273                 memcpy(name, filename, namelen);
274         }
275
276         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, ea_size);
277         ptlrpc_request_set_replen(req);
278
279         rc = mdc_getattr_common(exp, req);
280         if (rc)
281                 ptlrpc_req_finished(req);
282         else
283                 *request = req;
284         RETURN(rc);
285 }
286
287 static int mdc_is_subdir(struct obd_export *exp,
288                          const struct lu_fid *pfid,
289                          const struct lu_fid *cfid,
290                          struct ptlrpc_request **request)
291 {
292         struct ptlrpc_request  *req;
293         int                     rc;
294
295         ENTRY;
296
297         *request = NULL;
298         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
299                                         &RQF_MDS_IS_SUBDIR, LUSTRE_MDS_VERSION,
300                                         MDS_IS_SUBDIR);
301         if (req == NULL)
302                 RETURN(-ENOMEM);
303
304         mdc_is_subdir_pack(req, pfid, cfid, 0);
305         ptlrpc_request_set_replen(req);
306
307         rc = ptlrpc_queue_wait(req);
308         if (rc && rc != -EREMOTE)
309                 ptlrpc_req_finished(req);
310         else
311                 *request = req;
312         RETURN(rc);
313 }
314
315 static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
316                             const struct lu_fid *fid,
317                             struct obd_capa *oc, int opcode, obd_valid valid,
318                             const char *xattr_name, const char *input,
319                             int input_size, int output_size, int flags,
320                             __u32 suppgid, struct ptlrpc_request **request)
321 {
322         struct ptlrpc_request *req;
323         int   xattr_namelen = 0;
324         char *tmp;
325         int   rc;
326         ENTRY;
327
328         *request = NULL;
329         req = ptlrpc_request_alloc(class_exp2cliimp(exp), fmt);
330         if (req == NULL)
331                 RETURN(-ENOMEM);
332
333         mdc_set_capa_size(req, &RMF_CAPA1, oc);
334         if (xattr_name) {
335                 xattr_namelen = strlen(xattr_name) + 1;
336                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
337                                      xattr_namelen);
338         }
339         if (input_size) {
340                 LASSERT(input);
341                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
342                                      input_size);
343         }
344
345         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
346         if (rc) {
347                 ptlrpc_request_free(req);
348                 RETURN(rc);
349         }
350
351         if (opcode == MDS_REINT) {
352                 struct mdt_rec_setxattr *rec;
353
354                 CLASSERT(sizeof(struct mdt_rec_setxattr) ==
355                          sizeof(struct mdt_rec_reint));
356                 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
357                 rec->sx_opcode = REINT_SETXATTR;
358                 /* TODO:
359                  *  cfs_curproc_fs{u,g}id() should replace
360                  *  current->fs{u,g}id for portability.
361                  */
362                 rec->sx_fsuid  = cfs_curproc_fsuid();
363                 rec->sx_fsgid  = cfs_curproc_fsgid();
364                 rec->sx_cap    = cfs_curproc_cap_pack();
365                 rec->sx_suppgid1 = suppgid;
366                 rec->sx_suppgid2 = -1;
367                 rec->sx_fid    = *fid;
368                 rec->sx_valid  = valid | OBD_MD_FLCTIME;
369                 rec->sx_time   = cfs_time_current_sec();
370                 rec->sx_size   = output_size;
371                 rec->sx_flags  = flags;
372
373                 mdc_pack_capa(req, &RMF_CAPA1, oc);
374         } else {
375                 mdc_pack_body(req, fid, oc, valid, output_size, suppgid, flags);
376         }
377
378         if (xattr_name) {
379                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
380                 memcpy(tmp, xattr_name, xattr_namelen);
381         }
382         if (input_size) {
383                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
384                 memcpy(tmp, input, input_size);
385         }
386
387         if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER))
388                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
389                                      RCL_SERVER, output_size);
390         ptlrpc_request_set_replen(req);
391
392         /* make rpc */
393         if (opcode == MDS_REINT)
394                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
395
396         rc = ptlrpc_queue_wait(req);
397
398         if (opcode == MDS_REINT)
399                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
400
401         if (rc)
402                 ptlrpc_req_finished(req);
403         else
404                 *request = req;
405         RETURN(rc);
406 }
407
408 int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
409                  struct obd_capa *oc, obd_valid valid, const char *xattr_name,
410                  const char *input, int input_size, int output_size,
411                  int flags, __u32 suppgid, struct ptlrpc_request **request)
412 {
413         return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
414                                 fid, oc, MDS_REINT, valid, xattr_name,
415                                 input, input_size, output_size, flags,
416                                 suppgid, request);
417 }
418
419 int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
420                  struct obd_capa *oc, obd_valid valid, const char *xattr_name,
421                  const char *input, int input_size, int output_size,
422                  int flags, struct ptlrpc_request **request)
423 {
424         return mdc_xattr_common(exp, &RQF_MDS_GETXATTR,
425                                 fid, oc, MDS_GETXATTR, valid, xattr_name,
426                                 input, input_size, output_size, flags,
427                                 -1, request);
428 }
429
430 #ifdef CONFIG_FS_POSIX_ACL
431 static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md)
432 {
433         struct req_capsule     *pill = &req->rq_pill;
434         struct mdt_body        *body = md->body;
435         struct posix_acl       *acl;
436         void                   *buf;
437         int                     rc;
438         ENTRY;
439
440         if (!body->aclsize)
441                 RETURN(0);
442
443         buf = req_capsule_server_sized_get(pill, &RMF_ACL, body->aclsize);
444
445         if (!buf)
446                 RETURN(-EPROTO);
447
448         acl = posix_acl_from_xattr(buf, body->aclsize);
449         if (IS_ERR(acl)) {
450                 rc = PTR_ERR(acl);
451                 CERROR("convert xattr to acl: %d\n", rc);
452                 RETURN(rc);
453         }
454
455         rc = posix_acl_valid(acl);
456         if (rc) {
457                 CERROR("validate acl: %d\n", rc);
458                 posix_acl_release(acl);
459                 RETURN(rc);
460         }
461
462         md->posix_acl = acl;
463         RETURN(0);
464 }
465 #else
466 #define mdc_unpack_acl(req, md) 0
467 #endif
468
469 int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
470                       struct obd_export *dt_exp, struct obd_export *md_exp,
471                       struct lustre_md *md)
472 {
473         struct req_capsule *pill = &req->rq_pill;
474         int rc;
475         ENTRY;
476
477         LASSERT(md);
478         memset(md, 0, sizeof(*md));
479
480         md->body = req_capsule_server_get(pill, &RMF_MDT_BODY);
481         LASSERT(md->body != NULL);
482
483         if (md->body->valid & OBD_MD_FLEASIZE) {
484                 int lmmsize;
485                 struct lov_mds_md *lmm;
486
487                 if (!S_ISREG(md->body->mode)) {
488                         CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, should be a "
489                                "regular file, but is not\n");
490                         GOTO(out, rc = -EPROTO);
491                 }
492
493                 if (md->body->eadatasize == 0) {
494                         CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, "
495                                "but eadatasize 0\n");
496                         GOTO(out, rc = -EPROTO);
497                 }
498                 lmmsize = md->body->eadatasize;
499                 lmm = req_capsule_server_sized_get(pill, &RMF_MDT_MD, lmmsize);
500                 if (!lmm)
501                         GOTO(out, rc = -EPROTO);
502
503                 rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
504                 if (rc < 0)
505                         GOTO(out, rc);
506
507                 if (rc < sizeof(*md->lsm)) {
508                         CDEBUG(D_INFO, "lsm size too small: "
509                                "rc < sizeof (*md->lsm) (%d < %d)\n",
510                                rc, (int)sizeof(*md->lsm));
511                         GOTO(out, rc = -EPROTO);
512                 }
513
514         } else if (md->body->valid & OBD_MD_FLDIREA) {
515                 int lmvsize;
516                 struct lov_mds_md *lmv;
517
518                 if(!S_ISDIR(md->body->mode)) {
519                         CDEBUG(D_INFO, "OBD_MD_FLDIREA set, should be a "
520                                "directory, but is not\n");
521                         GOTO(out, rc = -EPROTO);
522                 }
523
524                 if (md->body->eadatasize == 0) {
525                         CDEBUG(D_INFO, "OBD_MD_FLDIREA is set, "
526                                "but eadatasize 0\n");
527                         RETURN(-EPROTO);
528                 }
529                 if (md->body->valid & OBD_MD_MEA) {
530                         lmvsize = md->body->eadatasize;
531                         lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
532                                                            lmvsize);
533                         if (!lmv)
534                                 GOTO(out, rc = -EPROTO);
535
536                         rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv,
537                                           lmvsize);
538                         if (rc < 0)
539                                 GOTO(out, rc);
540
541                         if (rc < sizeof(*md->mea)) {
542                                 CDEBUG(D_INFO, "size too small:  "
543                                        "rc < sizeof(*md->mea) (%d < %d)\n",
544                                         rc, (int)sizeof(*md->mea));
545                                 GOTO(out, rc = -EPROTO);
546                         }
547                 }
548         }
549         rc = 0;
550
551         if (md->body->valid & OBD_MD_FLRMTPERM) {
552                 /* remote permission */
553                 LASSERT(client_is_remote(exp));
554                 md->remote_perm = req_capsule_server_swab_get(pill, &RMF_ACL,
555                                                 lustre_swab_mdt_remote_perm);
556                 if (!md->remote_perm)
557                         GOTO(out, rc = -EPROTO);
558         }
559         else if (md->body->valid & OBD_MD_FLACL) {
560                 /* for ACL, it's possible that FLACL is set but aclsize is zero.
561                  * only when aclsize != 0 there's an actual segment for ACL
562                  * in reply buffer.
563                  */
564                 if (md->body->aclsize) {
565                         rc = mdc_unpack_acl(req, md);
566                         if (rc)
567                                 GOTO(out, rc);
568 #ifdef CONFIG_FS_POSIX_ACL
569                 } else {
570                         md->posix_acl = NULL;
571 #endif
572                 }
573         }
574         if (md->body->valid & OBD_MD_FLMDSCAPA) {
575                 struct obd_capa *oc = NULL;
576
577                 rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, &oc);
578                 if (rc)
579                         GOTO(out, rc);
580                 md->mds_capa = oc;
581         }
582
583         if (md->body->valid & OBD_MD_FLOSSCAPA) {
584                 struct obd_capa *oc = NULL;
585
586                 rc = mdc_unpack_capa(NULL, req, &RMF_CAPA2, &oc);
587                 if (rc)
588                         GOTO(out, rc);
589                 md->oss_capa = oc;
590         }
591
592         EXIT;
593 out:
594         if (rc) {
595                 if (md->oss_capa) {
596                         capa_put(md->oss_capa);
597                         md->oss_capa = NULL;
598                 }
599                 if (md->mds_capa) {
600                         capa_put(md->mds_capa);
601                         md->mds_capa = NULL;
602                 }
603 #ifdef CONFIG_FS_POSIX_ACL
604                 posix_acl_release(md->posix_acl);
605 #endif
606                 if (md->lsm)
607                         obd_free_memmd(dt_exp, &md->lsm);
608         }
609         return rc;
610 }
611
612 int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
613 {
614         ENTRY;
615         RETURN(0);
616 }
617
618 /**
619  * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING
620  * RPC chains.
621  */
622 void mdc_replay_open(struct ptlrpc_request *req)
623 {
624         struct md_open_data *mod = req->rq_cb_data;
625         struct ptlrpc_request *close_req;
626         struct obd_client_handle *och;
627         struct lustre_handle old;
628         struct mdt_body *body;
629         ENTRY;
630
631         if (mod == NULL) {
632                 DEBUG_REQ(D_ERROR, req,
633                           "Can't properly replay without open data.");
634                 EXIT;
635                 return;
636         }
637
638         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
639         LASSERT(body != NULL);
640
641         och = mod->mod_och;
642         if (och != NULL) {
643                 struct lustre_handle *file_fh;
644
645                 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
646
647                 file_fh = &och->och_fh;
648                 CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n",
649                        file_fh->cookie, body->handle.cookie);
650                 old = *file_fh;
651                 *file_fh = body->handle;
652         }
653         close_req = mod->mod_close_req;
654         if (close_req != NULL) {
655                 __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
656                 struct mdt_ioepoch *epoch;
657
658                 LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING);
659                 epoch = req_capsule_client_get(&close_req->rq_pill,
660                                                &RMF_MDT_EPOCH);
661                 LASSERT(epoch);
662
663                 if (och != NULL)
664                         LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
665                 DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
666                 epoch->handle = body->handle;
667         }
668         EXIT;
669 }
670
671 void mdc_commit_open(struct ptlrpc_request *req)
672 {
673         struct md_open_data *mod = req->rq_cb_data;
674         if (mod == NULL)
675                 return;
676
677         /**
678          * No need to touch md_open_data::mod_och, it holds a reference on
679          * \var mod and will zero references to each other, \var mod will be
680          * freed after that when md_open_data::mod_och will put the reference.
681          */
682
683         /**
684          * Do not let open request to disappear as it still may be needed
685          * for close rpc to happen (it may happen on evict only, otherwise
686          * ptlrpc_request::rq_replay does not let mdc_commit_open() to be
687          * called), just mark this rpc as committed to distinguish these 2
688          * cases, see mdc_close() for details. The open request reference will
689          * be put along with freeing \var mod.
690          */
691         ptlrpc_request_addref(req);
692         cfs_spin_lock(&req->rq_lock);
693         req->rq_committed = 1;
694         cfs_spin_unlock(&req->rq_lock);
695         req->rq_cb_data = NULL;
696         obd_mod_put(mod);
697 }
698
699 int mdc_set_open_replay_data(struct obd_export *exp,
700                              struct obd_client_handle *och,
701                              struct ptlrpc_request *open_req)
702 {
703         struct md_open_data   *mod;
704         struct mdt_rec_create *rec;
705         struct mdt_body       *body;
706         struct obd_import     *imp = open_req->rq_import;
707         ENTRY;
708
709         if (!open_req->rq_replay)
710                 RETURN(0);
711
712         rec = req_capsule_client_get(&open_req->rq_pill, &RMF_REC_REINT);
713         body = req_capsule_server_get(&open_req->rq_pill, &RMF_MDT_BODY);
714         LASSERT(rec != NULL);
715         /* Incoming message in my byte order (it's been swabbed). */
716         /* Outgoing messages always in my byte order. */
717         LASSERT(body != NULL);
718
719         /* Only if the import is replayable, we set replay_open data */
720         if (och && imp->imp_replayable) {
721                 mod = obd_mod_alloc();
722                 if (mod == NULL) {
723                         DEBUG_REQ(D_ERROR, open_req,
724                                   "Can't allocate md_open_data");
725                         RETURN(0);
726                 }
727
728                 /**
729                  * Take a reference on \var mod, to be freed on mdc_close().
730                  * It protects \var mod from being freed on eviction (commit
731                  * callback is called despite rq_replay flag).
732                  * Another reference for \var och.
733                  */
734                 obd_mod_get(mod);
735                 obd_mod_get(mod);
736
737                 cfs_spin_lock(&open_req->rq_lock);
738                 och->och_mod = mod;
739                 mod->mod_och = och;
740                 mod->mod_open_req = open_req;
741                 open_req->rq_cb_data = mod;
742                 open_req->rq_commit_cb = mdc_commit_open;
743                 cfs_spin_unlock(&open_req->rq_lock);
744         }
745
746         rec->cr_fid2 = body->fid1;
747         rec->cr_ioepoch = body->ioepoch;
748         rec->cr_old_handle.cookie = body->handle.cookie;
749         open_req->rq_replay_cb = mdc_replay_open;
750         if (!fid_is_sane(&body->fid1)) {
751                 DEBUG_REQ(D_ERROR, open_req, "Saving replay request with "
752                           "insane fid");
753                 LBUG();
754         }
755
756         DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data");
757         RETURN(0);
758 }
759
760 int mdc_clear_open_replay_data(struct obd_export *exp,
761                                struct obd_client_handle *och)
762 {
763         struct md_open_data *mod = och->och_mod;
764         ENTRY;
765
766         LASSERT(mod != LP_POISON && mod != NULL);
767
768         mod->mod_och = NULL;
769         och->och_mod = NULL;
770         obd_mod_put(mod);
771
772         RETURN(0);
773 }
774
775 /* Prepares the request for the replay by the given reply */
776 static void mdc_close_handle_reply(struct ptlrpc_request *req,
777                                    struct md_op_data *op_data, int rc) {
778         struct mdt_body  *repbody;
779         struct mdt_ioepoch *epoch;
780
781         if (req && rc == -EAGAIN) {
782                 repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
783                 epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
784
785                 epoch->flags |= MF_SOM_AU;
786                 if (repbody->valid & OBD_MD_FLGETATTRLOCK)
787                         op_data->op_flags |= MF_GETATTR_LOCK;
788         }
789 }
790
791 int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
792               struct md_open_data *mod, struct ptlrpc_request **request)
793 {
794         struct obd_device     *obd = class_exp2obd(exp);
795         struct ptlrpc_request *req;
796         int                    rc;
797         ENTRY;
798
799         *request = NULL;
800         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_CLOSE);
801         if (req == NULL)
802                 RETURN(-ENOMEM);
803
804         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
805
806         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE);
807         if (rc) {
808                 ptlrpc_request_free(req);
809                 RETURN(rc);
810         }
811
812         /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
813          * portal whose threads are not taking any DLM locks and are therefore
814          * always progressing */
815         req->rq_request_portal = MDS_READPAGE_PORTAL;
816         ptlrpc_at_set_req_timeout(req);
817
818         /* Ensure that this close's handle is fixed up during replay. */
819         if (likely(mod != NULL)) {
820                 LASSERTF(mod->mod_open_req != NULL &&
821                          mod->mod_open_req->rq_type != LI_POISON,
822                          "POISONED open %p!\n", mod->mod_open_req);
823
824                 mod->mod_close_req = req;
825
826                 DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
827                 /* We no longer want to preserve this open for replay even
828                  * though the open was committed. b=3632, b=3633 */
829                 cfs_spin_lock(&mod->mod_open_req->rq_lock);
830                 mod->mod_open_req->rq_replay = 0;
831                 cfs_spin_unlock(&mod->mod_open_req->rq_lock);
832         } else {
833                  CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
834         }
835
836         mdc_close_pack(req, op_data);
837
838         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
839                              obd->u.cli.cl_max_mds_easize);
840         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
841                              obd->u.cli.cl_max_mds_cookiesize);
842
843         ptlrpc_request_set_replen(req);
844
845         mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
846         rc = ptlrpc_queue_wait(req);
847         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
848
849         if (req->rq_repmsg == NULL) {
850                 CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
851                        req->rq_status);
852                 if (rc == 0)
853                         rc = req->rq_status ?: -EIO;
854         } else if (rc == 0 || rc == -EAGAIN) {
855                 struct mdt_body *body;
856
857                 rc = lustre_msg_get_status(req->rq_repmsg);
858                 if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
859                         DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err "
860                                   "= %d", rc);
861                         if (rc > 0)
862                                 rc = -rc;
863                 }
864                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
865                 if (body == NULL)
866                         rc = -EPROTO;
867         } else if (rc == -ESTALE) {
868                 /**
869                  * it can be allowed error after 3633 if open was committed and
870                  * server failed before close was sent. Let's check if mod
871                  * exists and return no error in that case
872                  */
873                 if (mod) {
874                         LASSERT(mod->mod_open_req != NULL);
875                         if (mod->mod_open_req->rq_committed)
876                                 rc = 0;
877                 }
878         }
879
880         if (mod) {
881                 if (rc != 0)
882                         mod->mod_close_req = NULL;
883                 /* Since now, mod is accessed through open_req only,
884                  * thus close req does not keep a reference on mod anymore. */
885                 obd_mod_put(mod);
886         }
887         *request = req;
888         mdc_close_handle_reply(req, op_data, rc);
889         RETURN(rc);
890 }
891
892 int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
893                      struct md_open_data *mod)
894 {
895         struct obd_device     *obd = class_exp2obd(exp);
896         struct ptlrpc_request *req;
897         int                    rc;
898         ENTRY;
899
900         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
901                                    &RQF_MDS_DONE_WRITING);
902         if (req == NULL)
903                 RETURN(-ENOMEM);
904
905         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
906         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING);
907         if (rc) {
908                 ptlrpc_request_free(req);
909                 RETURN(rc);
910         }
911
912         if (mod != NULL) {
913                 LASSERTF(mod->mod_open_req != NULL &&
914                          mod->mod_open_req->rq_type != LI_POISON,
915                          "POISONED setattr %p!\n", mod->mod_open_req);
916
917                 mod->mod_close_req = req;
918                 DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr");
919                 /* We no longer want to preserve this setattr for replay even
920                  * though the open was committed. b=3632, b=3633 */
921                 cfs_spin_lock(&mod->mod_open_req->rq_lock);
922                 mod->mod_open_req->rq_replay = 0;
923                 cfs_spin_unlock(&mod->mod_open_req->rq_lock);
924         }
925
926         mdc_close_pack(req, op_data);
927         ptlrpc_request_set_replen(req);
928
929         mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
930         rc = ptlrpc_queue_wait(req);
931         mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
932
933         if (rc == -ESTALE) {
934                 /**
935                  * it can be allowed error after 3633 if open or setattr were
936                  * committed and server failed before close was sent.
937                  * Let's check if mod exists and return no error in that case
938                  */
939                 if (mod) {
940                         LASSERT(mod->mod_open_req != NULL);
941                         if (mod->mod_open_req->rq_committed)
942                                 rc = 0;
943                 }
944         }
945
946         if (mod) {
947                 if (rc != 0)
948                         mod->mod_close_req = NULL;
949                 /* Since now, mod is accessed through setattr req only,
950                  * thus DW req does not keep a reference on mod anymore. */
951                 obd_mod_put(mod);
952         }
953
954         mdc_close_handle_reply(req, op_data, rc);
955         ptlrpc_req_finished(req);
956         RETURN(rc);
957 }
958
959 #ifdef HAVE_SPLIT_SUPPORT
960 int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
961                  const struct page *page, int offset)
962 {
963         struct ptlrpc_request   *req;
964         struct ptlrpc_bulk_desc *desc;
965         int                      rc;
966         ENTRY;
967
968         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_WRITEPAGE);
969         if (req == NULL)
970                 RETURN(-ENOMEM);
971
972         /* FIXME: capa doesn't support split yet */
973         mdc_set_capa_size(req, &RMF_CAPA1, NULL);
974
975         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_WRITEPAGE);
976         if (rc) {
977                 ptlrpc_request_free(req);
978                 RETURN(rc);
979         }
980
981         req->rq_request_portal = MDS_READPAGE_PORTAL;
982         ptlrpc_at_set_req_timeout(req);
983
984         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL);
985         if (desc == NULL)
986                 GOTO(out, rc = -ENOMEM);
987
988         /* NB req now owns desc and will free it when it gets freed. */
989         ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset);
990         mdc_readdir_pack(req, 0, offset, fid, NULL);
991
992         ptlrpc_request_set_replen(req);
993         rc = ptlrpc_queue_wait(req);
994         if (rc)
995                 GOTO(out, rc);
996
997         rc = sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
998 out:
999         ptlrpc_req_finished(req);
1000         return rc;
1001 }
1002 EXPORT_SYMBOL(mdc_sendpage);
1003 #endif
1004
1005 int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
1006                  struct obd_capa *oc, __u64 offset, struct page *page,
1007                  struct ptlrpc_request **request)
1008 {
1009         struct ptlrpc_request   *req;
1010         struct ptlrpc_bulk_desc *desc;
1011         int                      rc;
1012         ENTRY;
1013
1014         *request = NULL;
1015         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
1016         if (req == NULL)
1017                 RETURN(-ENOMEM);
1018
1019         mdc_set_capa_size(req, &RMF_CAPA1, oc);
1020
1021         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE);
1022         if (rc) {
1023                 ptlrpc_request_free(req);
1024                 RETURN(rc);
1025         }
1026
1027         req->rq_request_portal = MDS_READPAGE_PORTAL;
1028         ptlrpc_at_set_req_timeout(req);
1029
1030         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL);
1031         if (desc == NULL) {
1032                 ptlrpc_request_free(req);
1033                 RETURN(-ENOMEM);
1034         }
1035
1036         /* NB req now owns desc and will free it when it gets freed */
1037         ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE);
1038         mdc_readdir_pack(req, offset, CFS_PAGE_SIZE, fid, oc);
1039
1040         ptlrpc_request_set_replen(req);
1041         rc = ptlrpc_queue_wait(req);
1042         if (rc) {
1043                 ptlrpc_req_finished(req);
1044                 RETURN(rc);
1045         }
1046
1047         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
1048                                           req->rq_bulk->bd_nob_transferred);
1049         if (rc < 0) {
1050                 ptlrpc_req_finished(req);
1051                 RETURN(rc);
1052         }
1053
1054         if (req->rq_bulk->bd_nob_transferred != CFS_PAGE_SIZE) {
1055                 CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
1056                         req->rq_bulk->bd_nob_transferred, CFS_PAGE_SIZE);
1057                 ptlrpc_req_finished(req);
1058                 RETURN(-EPROTO);
1059         }
1060
1061         *request = req;
1062         RETURN(0);
1063 }
1064
1065 static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
1066 {
1067         __u32 keylen, vallen;
1068         void *key;
1069         int rc;
1070
1071         if (gf->gf_pathlen > PATH_MAX)
1072                 RETURN(-ENAMETOOLONG);
1073         if (gf->gf_pathlen < 2)
1074                 RETURN(-EOVERFLOW);
1075
1076         /* Key is KEY_FID2PATH + getinfo_fid2path description */
1077         keylen = cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf);
1078         OBD_ALLOC(key, keylen);
1079         if (key == NULL)
1080                 RETURN(-ENOMEM);
1081         memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH));
1082         memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf));
1083
1084         CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
1085                PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno);
1086
1087         if (!fid_is_sane(&gf->gf_fid))
1088                 GOTO(out, rc = -EINVAL);
1089
1090         /* Val is struct getinfo_fid2path result plus path */
1091         vallen = sizeof(*gf) + gf->gf_pathlen;
1092
1093         rc = obd_get_info(exp, keylen, key, &vallen, gf, NULL);
1094         if (rc)
1095                 GOTO(out, rc);
1096
1097         if (vallen <= sizeof(*gf))
1098                 GOTO(out, rc = -EPROTO);
1099         else if (vallen > sizeof(*gf) + gf->gf_pathlen)
1100                 GOTO(out, rc = -EOVERFLOW);
1101
1102         CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n%s\n",
1103                PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno, gf->gf_path);
1104
1105 out:
1106         OBD_FREE(key, keylen);
1107         return rc;
1108 }
1109
1110 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1111                          void *karg, void *uarg)
1112 {
1113         struct obd_device *obd = exp->exp_obd;
1114         struct obd_ioctl_data *data = karg;
1115         struct obd_import *imp = obd->u.cli.cl_import;
1116         struct llog_ctxt *ctxt;
1117         int rc;
1118         ENTRY;
1119
1120         if (!cfs_try_module_get(THIS_MODULE)) {
1121                 CERROR("Can't get module. Is it alive?");
1122                 return -EINVAL;
1123         }
1124         switch (cmd) {
1125         case OBD_IOC_CHANGELOG_CLEAR: {
1126                 struct ioc_changelog_clear *icc = karg;
1127                 struct changelog_setinfo cs =
1128                         {icc->icc_recno, icc->icc_id};
1129                 rc = obd_set_info_async(exp, strlen(KEY_CHANGELOG_CLEAR),
1130                                         KEY_CHANGELOG_CLEAR, sizeof(cs), &cs,
1131                                         NULL);
1132                 GOTO(out, rc);
1133         }
1134         case OBD_IOC_FID2PATH: {
1135                 rc = mdc_ioc_fid2path(exp, karg);
1136                 GOTO(out, rc);
1137         }
1138         case OBD_IOC_CLIENT_RECOVER:
1139                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1);
1140                 if (rc < 0)
1141                         GOTO(out, rc);
1142                 GOTO(out, rc = 0);
1143         case IOC_OSC_SET_ACTIVE:
1144                 rc = ptlrpc_set_import_active(imp, data->ioc_offset);
1145                 GOTO(out, rc);
1146         case OBD_IOC_PARSE: {
1147                 ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
1148                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
1149                 llog_ctxt_put(ctxt);
1150                 GOTO(out, rc);
1151         }
1152 #ifdef __KERNEL__
1153         case OBD_IOC_LLOG_INFO:
1154         case OBD_IOC_LLOG_PRINT: {
1155                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
1156                 rc = llog_ioctl(ctxt, cmd, data);
1157                 llog_ctxt_put(ctxt);
1158                 GOTO(out, rc);
1159         }
1160 #endif
1161         case OBD_IOC_POLL_QUOTACHECK:
1162                 rc = lquota_poll_check(quota_interface, exp,
1163                                        (struct if_quotacheck *)karg);
1164                 GOTO(out, rc);
1165         case OBD_IOC_PING_TARGET:
1166                 rc = ptlrpc_obd_ping(obd);
1167                 GOTO(out, rc);
1168         default:
1169                 CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
1170                 GOTO(out, rc = -ENOTTY);
1171         }
1172 out:
1173         cfs_module_put(THIS_MODULE);
1174
1175         return rc;
1176 }
1177
1178 int mdc_get_info_rpc(struct obd_export *exp,
1179                      obd_count keylen, void *key,
1180                      int vallen, void *val)
1181 {
1182         struct obd_import      *imp = class_exp2cliimp(exp);
1183         struct ptlrpc_request  *req;
1184         char                   *tmp;
1185         int                     rc = -EINVAL;
1186         ENTRY;
1187
1188         req = ptlrpc_request_alloc(imp, &RQF_MDS_GET_INFO);
1189         if (req == NULL)
1190                 RETURN(-ENOMEM);
1191
1192         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
1193                              RCL_CLIENT, keylen);
1194         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
1195                              RCL_CLIENT, sizeof(__u32));
1196
1197         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
1198         if (rc) {
1199                 ptlrpc_request_free(req);
1200                 RETURN(rc);
1201         }
1202
1203         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
1204         memcpy(tmp, key, keylen);
1205         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
1206         memcpy(tmp, &vallen, sizeof(__u32));
1207
1208         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
1209                              RCL_SERVER, vallen);
1210         ptlrpc_request_set_replen(req);
1211
1212         rc = ptlrpc_queue_wait(req);
1213         if (rc == 0) {
1214                 tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
1215                 memcpy(val, tmp, vallen);
1216                 if (ptlrpc_rep_need_swab(req)) {
1217                         if (KEY_IS(KEY_FID2PATH)) {
1218                                 lustre_swab_fid2path(val);
1219                         }
1220                 }
1221         }
1222         ptlrpc_req_finished(req);
1223
1224         RETURN(rc);
1225 }
1226
1227 static void lustre_swab_hai(struct hsm_action_item *h)
1228 {
1229         __swab32s(&h->hai_len);
1230         __swab32s(&h->hai_action);
1231         lustre_swab_lu_fid(&h->hai_fid);
1232         __swab64s(&h->hai_cookie);
1233         __swab64s(&h->hai_extent_start);
1234         __swab64s(&h->hai_extent_end);
1235         __swab64s(&h->hai_gid);
1236 }
1237
1238 static void lustre_swab_hal(struct hsm_action_list *h)
1239 {
1240         struct hsm_action_item *hai;
1241         int i;
1242
1243         __swab32s(&h->hal_version);
1244         __swab32s(&h->hal_count);
1245         __swab32s(&h->hal_archive_num);
1246         hai = hai_zero(h);
1247         for (i = 0; i < h->hal_count; i++) {
1248                 lustre_swab_hai(hai);
1249                 hai = hai_next(hai);
1250         }
1251 }
1252
1253 /**
1254  * Send a message to any listening copytools, nonblocking
1255  * @param val LNL message (lnl_hdr + hsm_action_list)
1256  * @param len total length of message
1257  */
1258 static int mdc_hsm_copytool_send(int len, void *val)
1259 {
1260         struct lnl_hdr *lh = (struct lnl_hdr *)val;
1261         struct hsm_action_list *hal = (struct hsm_action_list *)(lh + 1);
1262         int rc;
1263         ENTRY;
1264
1265         if (len < sizeof(*lh) + sizeof(*hal)) {
1266                 CERROR("Short HSM message %d < %d\n", len,
1267                       (int) (sizeof(*lh) + sizeof(*hal)));
1268                 RETURN(-EPROTO);
1269         }
1270         if (lh->lnl_magic == __swab16(LNL_MAGIC)) {
1271                 lustre_swab_lnlh(lh);
1272                 lustre_swab_hal(hal);
1273         } else if (lh->lnl_magic != LNL_MAGIC) {
1274                 CERROR("Bad magic %x!=%x\n", lh->lnl_magic, LNL_MAGIC);
1275                 RETURN(-EPROTO);
1276         }
1277
1278         CDEBUG(D_IOCTL, " Received message mg=%x t=%d m=%d l=%d actions=%d\n",
1279                lh->lnl_magic, lh->lnl_transport, lh->lnl_msgtype,
1280                lh->lnl_msglen, hal->hal_count);
1281
1282         /* Broadcast to HSM listeners */
1283         rc = libcfs_klnl_msg_put(0, LNL_GRP_HSM, lh);
1284
1285         RETURN(rc);
1286 }
1287
1288 int mdc_set_info_async(struct obd_export *exp,
1289                        obd_count keylen, void *key,
1290                        obd_count vallen, void *val,
1291                        struct ptlrpc_request_set *set)
1292 {
1293         struct obd_import *imp = class_exp2cliimp(exp);
1294         int                rc = -EINVAL;
1295         ENTRY;
1296
1297         if (KEY_IS(KEY_INIT_RECOV)) {
1298                 if (vallen != sizeof(int))
1299                         RETURN(-EINVAL);
1300                 cfs_spin_lock(&imp->imp_lock);
1301                 imp->imp_initial_recov = *(int *)val;
1302                 cfs_spin_unlock(&imp->imp_lock);
1303                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
1304                        exp->exp_obd->obd_name, imp->imp_initial_recov);
1305                 RETURN(0);
1306         }
1307         /* Turn off initial_recov after we try all backup servers once */
1308         if (KEY_IS(KEY_INIT_RECOV_BACKUP)) {
1309                 if (vallen != sizeof(int))
1310                         RETURN(-EINVAL);
1311                 cfs_spin_lock(&imp->imp_lock);
1312                 imp->imp_initial_recov_bk = *(int *)val;
1313                 if (imp->imp_initial_recov_bk)
1314                         imp->imp_initial_recov = 1;
1315                 cfs_spin_unlock(&imp->imp_lock);
1316                 CDEBUG(D_HA, "%s: set imp_initial_recov_bk = %d\n",
1317                        exp->exp_obd->obd_name, imp->imp_initial_recov_bk);
1318                 RETURN(0);
1319         }
1320         if (KEY_IS(KEY_READ_ONLY)) {
1321                 if (vallen != sizeof(int))
1322                         RETURN(-EINVAL);
1323
1324                 cfs_spin_lock(&imp->imp_lock);
1325                 if (*((int *)val)) {
1326                         imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
1327                         imp->imp_connect_data.ocd_connect_flags |= OBD_CONNECT_RDONLY;
1328                 } else {
1329                         imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
1330                         imp->imp_connect_data.ocd_connect_flags &= ~OBD_CONNECT_RDONLY;
1331                 }
1332                 cfs_spin_unlock(&imp->imp_lock);
1333
1334                 rc = target_set_info_rpc(imp, MDS_SET_INFO,
1335                                          keylen, key, vallen, val, set);
1336                 RETURN(rc);
1337         }
1338         if (KEY_IS(KEY_SPTLRPC_CONF)) {
1339                 sptlrpc_conf_client_adapt(exp->exp_obd);
1340                 RETURN(0);
1341         }
1342         if (KEY_IS(KEY_FLUSH_CTX)) {
1343                 sptlrpc_import_flush_my_ctx(imp);
1344                 RETURN(0);
1345         }
1346         if (KEY_IS(KEY_MDS_CONN)) {
1347                 /* mds-mds import */
1348                 cfs_spin_lock(&imp->imp_lock);
1349                 imp->imp_server_timeout = 1;
1350                 cfs_spin_unlock(&imp->imp_lock);
1351                 imp->imp_client->cli_request_portal = MDS_MDS_PORTAL;
1352                 CDEBUG(D_OTHER, "%s: timeout / 2\n", exp->exp_obd->obd_name);
1353                 RETURN(0);
1354         }
1355         if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
1356                 rc = target_set_info_rpc(imp, MDS_SET_INFO,
1357                                          keylen, key, vallen, val, set);
1358                 RETURN(rc);
1359         }
1360         if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) {
1361                 rc = mdc_hsm_copytool_send(vallen, val);
1362                 RETURN(rc);
1363         }
1364
1365         RETURN(rc);
1366 }
1367
1368 int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key,
1369                  __u32 *vallen, void *val, struct lov_stripe_md *lsm)
1370 {
1371         int rc = -EINVAL;
1372
1373         if (KEY_IS(KEY_MAX_EASIZE)) {
1374                 int mdsize, *max_easize;
1375
1376                 if (*vallen != sizeof(int))
1377                         RETURN(-EINVAL);
1378                 mdsize = *(int*)val;
1379                 if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
1380                         exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
1381                 max_easize = val;
1382                 *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
1383                 RETURN(0);
1384         }
1385         if (KEY_IS(KEY_CONN_DATA)) {
1386                 struct obd_import *imp = class_exp2cliimp(exp);
1387                 struct obd_connect_data *data = val;
1388
1389                 if (*vallen != sizeof(*data))
1390                         RETURN(-EINVAL);
1391
1392                 *data = imp->imp_connect_data;
1393                 RETURN(0);
1394         }
1395
1396         rc = mdc_get_info_rpc(exp, keylen, key, *vallen, val);
1397
1398         RETURN(rc);
1399 }
1400
1401 static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1402                       __u64 max_age, __u32 flags)
1403 {
1404         struct ptlrpc_request *req;
1405         struct obd_statfs     *msfs;
1406         struct obd_import     *imp = NULL;
1407         int                    rc;
1408         ENTRY;
1409
1410
1411         /*Since the request might also come from lprocfs, so we need
1412          *sync this with client_disconnect_export Bug15684*/
1413         cfs_down_read(&obd->u.cli.cl_sem);
1414         if (obd->u.cli.cl_import)
1415                 imp = class_import_get(obd->u.cli.cl_import);
1416         cfs_up_read(&obd->u.cli.cl_sem);
1417         if (!imp)
1418                 RETURN(-ENODEV);
1419
1420         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS,
1421                                         LUSTRE_MDS_VERSION, MDS_STATFS);
1422         if (req == NULL)
1423                 GOTO(output, rc = -ENOMEM);
1424
1425         ptlrpc_request_set_replen(req);
1426
1427         if (flags & OBD_STATFS_NODELAY) {
1428                 /* procfs requests not want stay in wait for avoid deadlock */
1429                 req->rq_no_resend = 1;
1430                 req->rq_no_delay = 1;
1431         }
1432
1433         rc = ptlrpc_queue_wait(req);
1434         if (rc) {
1435                 /* check connection error first */
1436                 if (imp->imp_connect_error)
1437                         rc = imp->imp_connect_error;
1438                 GOTO(out, rc);
1439         }
1440
1441         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1442         if (msfs == NULL)
1443                 GOTO(out, rc = -EPROTO);
1444
1445         *osfs = *msfs;
1446         EXIT;
1447 out:
1448         ptlrpc_req_finished(req);
1449 output:
1450         class_import_put(imp);
1451         return rc;
1452 }
1453
1454 static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid,
1455                    struct obd_capa *oc, struct obd_client_handle *handle,
1456                    int flags)
1457 {
1458         struct ptlrpc_request *req;
1459         struct mdt_body       *body;
1460         int                    rc;
1461         ENTRY;
1462
1463         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_PIN);
1464         if (req == NULL)
1465                 RETURN(-ENOMEM);
1466
1467         mdc_set_capa_size(req, &RMF_CAPA1, oc);
1468
1469         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_PIN);
1470         if (rc) {
1471                 ptlrpc_request_free(req);
1472                 RETURN(rc);
1473         }
1474
1475         mdc_pack_body(req, fid, oc, 0, 0, -1, flags);
1476
1477         ptlrpc_request_set_replen(req);
1478
1479         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1480         rc = ptlrpc_queue_wait(req);
1481         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1482         if (rc) {
1483                 CERROR("Pin failed: %d\n", rc);
1484                 GOTO(err_out, rc);
1485         }
1486
1487         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1488         if (body == NULL)
1489                 GOTO(err_out, rc = -EPROTO);
1490
1491         handle->och_fh = body->handle;
1492         handle->och_magic = OBD_CLIENT_HANDLE_MAGIC;
1493
1494         handle->och_mod = obd_mod_alloc();
1495         if (handle->och_mod == NULL) {
1496                 DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data");
1497                 GOTO(err_out, rc = -ENOMEM);
1498         }
1499         handle->och_mod->mod_open_req = req; /* will be dropped by unpin */
1500
1501         RETURN(0);
1502
1503 err_out:
1504         ptlrpc_req_finished(req);
1505         RETURN(rc);
1506 }
1507
1508 static int mdc_unpin(struct obd_export *exp, struct obd_client_handle *handle,
1509                      int flag)
1510 {
1511         struct ptlrpc_request *req;
1512         struct mdt_body       *body;
1513         int                    rc;
1514         ENTRY;
1515
1516         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_UNPIN,
1517                                         LUSTRE_MDS_VERSION, MDS_UNPIN);
1518         if (req == NULL)
1519                 RETURN(-ENOMEM);
1520
1521         body = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY);
1522         body->handle = handle->och_fh;
1523         body->flags = flag;
1524
1525         ptlrpc_request_set_replen(req);
1526
1527         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1528         rc = ptlrpc_queue_wait(req);
1529         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
1530
1531         if (rc != 0)
1532                 CERROR("Unpin failed: %d\n", rc);
1533
1534         ptlrpc_req_finished(req);
1535         ptlrpc_req_finished(handle->och_mod->mod_open_req);
1536
1537         obd_mod_put(handle->och_mod);
1538         RETURN(rc);
1539 }
1540
1541 int mdc_sync(struct obd_export *exp, const struct lu_fid *fid,
1542              struct obd_capa *oc, struct ptlrpc_request **request)
1543 {
1544         struct ptlrpc_request *req;
1545         int                    rc;
1546         ENTRY;
1547
1548         *request = NULL;
1549         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_SYNC);
1550         if (req == NULL)
1551                 RETURN(-ENOMEM);
1552
1553         mdc_set_capa_size(req, &RMF_CAPA1, oc);
1554
1555         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SYNC);
1556         if (rc) {
1557                 ptlrpc_request_free(req);
1558                 RETURN(rc);
1559         }
1560
1561         mdc_pack_body(req, fid, oc, 0, 0, -1, 0);
1562
1563         ptlrpc_request_set_replen(req);
1564
1565         rc = ptlrpc_queue_wait(req);
1566         if (rc)
1567                 ptlrpc_req_finished(req);
1568         else
1569                 *request = req;
1570         RETURN(rc);
1571 }
1572
1573 static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
1574                             enum obd_import_event event)
1575 {
1576         int rc = 0;
1577
1578         LASSERT(imp->imp_obd == obd);
1579
1580         switch (event) {
1581         case IMP_EVENT_DISCON: {
1582 #if 0
1583                 /* XXX Pass event up to OBDs stack. used only for FLD now */
1584                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DISCON, NULL);
1585 #endif
1586                 break;
1587         }
1588         case IMP_EVENT_INACTIVE: {
1589                 struct client_obd *cli = &obd->u.cli;
1590                 /*
1591                  * Flush current sequence to make client obtain new one
1592                  * from server in case of disconnect/reconnect.
1593                  * If range is already empty then no need to flush it.
1594                  */
1595                 if (cli->cl_seq != NULL &&
1596                     !range_is_exhausted(&cli->cl_seq->lcs_space)) {
1597                         seq_client_flush(cli->cl_seq);
1598                 }
1599
1600                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
1601                 break;
1602         }
1603         case IMP_EVENT_INVALIDATE: {
1604                 struct ldlm_namespace *ns = obd->obd_namespace;
1605
1606                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
1607
1608                 break;
1609         }
1610         case IMP_EVENT_ACTIVE: {
1611                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
1612                 break;
1613         }
1614         case IMP_EVENT_OCD:
1615                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
1616                 break;
1617
1618         default:
1619                 CERROR("Unknown import event %x\n", event);
1620                 LBUG();
1621         }
1622         RETURN(rc);
1623 }
1624
1625 static int mdc_fid_init(struct obd_export *exp)
1626 {
1627         struct client_obd *cli = &exp->exp_obd->u.cli;
1628         char *prefix;
1629         int rc;
1630         ENTRY;
1631
1632         OBD_ALLOC_PTR(cli->cl_seq);
1633         if (cli->cl_seq == NULL)
1634                 RETURN(-ENOMEM);
1635
1636         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
1637         if (prefix == NULL)
1638                 GOTO(out_free_seq, rc = -ENOMEM);
1639
1640         snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s",
1641                  exp->exp_obd->obd_name);
1642
1643         /* Init client side sequence-manager */
1644         rc = seq_client_init(cli->cl_seq, exp,
1645                              LUSTRE_SEQ_METADATA,
1646                              prefix, NULL);
1647         OBD_FREE(prefix, MAX_OBD_NAME + 5);
1648         if (rc)
1649                 GOTO(out_free_seq, rc);
1650
1651         RETURN(rc);
1652 out_free_seq:
1653         OBD_FREE_PTR(cli->cl_seq);
1654         cli->cl_seq = NULL;
1655         return rc;
1656 }
1657
1658 static int mdc_fid_fini(struct obd_export *exp)
1659 {
1660         struct client_obd *cli = &exp->exp_obd->u.cli;
1661         ENTRY;
1662
1663         if (cli->cl_seq != NULL) {
1664                 seq_client_fini(cli->cl_seq);
1665                 OBD_FREE_PTR(cli->cl_seq);
1666                 cli->cl_seq = NULL;
1667         }
1668
1669         RETURN(0);
1670 }
1671
1672 int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1673                   struct md_op_data *op_data)
1674 {
1675         struct client_obd *cli = &exp->exp_obd->u.cli;
1676         struct lu_client_seq *seq = cli->cl_seq;
1677         ENTRY;
1678         RETURN(seq_client_alloc_fid(seq, fid));
1679 }
1680
1681 /* XXX This method is used only to clear current fid seq
1682  * once fld/mds insert failed */
1683 static int mdc_fid_delete(struct obd_export *exp, const struct lu_fid *fid)
1684 {
1685         struct client_obd *cli = &exp->exp_obd->u.cli;
1686
1687         seq_client_flush(cli->cl_seq);
1688         return 0;
1689 }
1690
1691 struct obd_uuid *mdc_get_uuid(struct obd_export *exp) {
1692         struct client_obd *cli = &exp->exp_obd->u.cli;
1693         return &cli->cl_target_uuid;
1694 }
1695
1696 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
1697 {
1698         struct client_obd *cli = &obd->u.cli;
1699         struct lprocfs_static_vars lvars = { 0 };
1700         int rc;
1701         ENTRY;
1702
1703         OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1704         if (!cli->cl_rpc_lock)
1705                 RETURN(-ENOMEM);
1706         mdc_init_rpc_lock(cli->cl_rpc_lock);
1707
1708         ptlrpcd_addref();
1709
1710         OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1711         if (!cli->cl_setattr_lock)
1712                 GOTO(err_rpc_lock, rc = -ENOMEM);
1713         mdc_init_rpc_lock(cli->cl_setattr_lock);
1714
1715         OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1716         if (!cli->cl_close_lock)
1717                 GOTO(err_setattr_lock, rc = -ENOMEM);
1718         mdc_init_rpc_lock(cli->cl_close_lock);
1719
1720         rc = client_obd_setup(obd, cfg);
1721         if (rc)
1722                 GOTO(err_close_lock, rc);
1723         lprocfs_mdc_init_vars(&lvars);
1724         lprocfs_obd_setup(obd, lvars.obd_vars);
1725         sptlrpc_lprocfs_cliobd_attach(obd);
1726         ptlrpc_lprocfs_register_obd(obd);
1727
1728         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
1729         if (rc) {
1730                 mdc_cleanup(obd);
1731                 CERROR("failed to setup llogging subsystems\n");
1732         }
1733
1734         /* ignore errors */
1735         libcfs_klnl_start(LNL_TRANSPORT_HSM);
1736         libcfs_klnl_start(LNL_TRANSPORT_CHANGELOG);
1737
1738         RETURN(rc);
1739
1740 err_close_lock:
1741         OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1742 err_setattr_lock:
1743         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1744 err_rpc_lock:
1745         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1746         ptlrpcd_decref();
1747         RETURN(rc);
1748 }
1749
1750 /* Initialize the default and maximum LOV EA and cookie sizes.  This allows
1751  * us to make MDS RPCs with large enough reply buffers to hold the
1752  * maximum-sized (= maximum striped) EA and cookie without having to
1753  * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */
1754 static int mdc_init_ea_size(struct obd_export *exp, int easize,
1755                      int def_easize, int cookiesize)
1756 {
1757         struct obd_device *obd = exp->exp_obd;
1758         struct client_obd *cli = &obd->u.cli;
1759         ENTRY;
1760
1761         if (cli->cl_max_mds_easize < easize)
1762                 cli->cl_max_mds_easize = easize;
1763
1764         if (cli->cl_default_mds_easize < def_easize)
1765                 cli->cl_default_mds_easize = def_easize;
1766
1767         if (cli->cl_max_mds_cookiesize < cookiesize)
1768                 cli->cl_max_mds_cookiesize = cookiesize;
1769
1770         RETURN(0);
1771 }
1772
1773 static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
1774 {
1775         int rc = 0;
1776         ENTRY;
1777
1778         switch (stage) {
1779         case OBD_CLEANUP_EARLY:
1780         case OBD_CLEANUP_EXPORTS:
1781                 /* If we set up but never connected, the
1782                    client import will not have been cleaned. */
1783                 if (obd->u.cli.cl_import) {
1784                         struct obd_import *imp;
1785                         cfs_down_write(&obd->u.cli.cl_sem);
1786                         imp = obd->u.cli.cl_import;
1787                         CERROR("client import never connected\n");
1788                         ptlrpc_invalidate_import(imp);
1789                         class_destroy_import(imp);
1790                         cfs_up_write(&obd->u.cli.cl_sem);
1791                         obd->u.cli.cl_import = NULL;
1792                 }
1793                 rc = obd_llog_finish(obd, 0);
1794                 if (rc != 0)
1795                         CERROR("failed to cleanup llogging subsystems\n");
1796                 break;
1797         }
1798         RETURN(rc);
1799 }
1800
1801 static int mdc_cleanup(struct obd_device *obd)
1802 {
1803         struct client_obd *cli = &obd->u.cli;
1804
1805         libcfs_klnl_stop(LNL_TRANSPORT_HSM, LNL_GRP_HSM);
1806         libcfs_klnl_stop(LNL_TRANSPORT_CHANGELOG, 0);
1807
1808         OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
1809         OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock));
1810         OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
1811
1812         ptlrpc_lprocfs_unregister_obd(obd);
1813         lprocfs_obd_cleanup(obd);
1814         ptlrpcd_decref();
1815
1816         return client_obd_cleanup(obd);
1817 }
1818
1819
1820 static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
1821                          struct obd_device *tgt, int *index)
1822 {
1823         struct llog_ctxt *ctxt;
1824         int rc;
1825         ENTRY;
1826
1827         LASSERT(olg == &obd->obd_olg);
1828
1829         rc = llog_setup(obd, olg, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL,
1830                         &llog_client_ops);
1831         if (rc)
1832                 RETURN(rc);
1833
1834         ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
1835         llog_initiator_connect(ctxt);
1836         llog_ctxt_put(ctxt);
1837
1838         rc = llog_setup(obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt, 0, NULL,
1839                         &llog_client_ops);
1840         if (rc == 0) {
1841                 ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
1842                 llog_initiator_connect(ctxt);
1843                 llog_ctxt_put(ctxt);
1844         }
1845
1846         RETURN(rc);
1847 }
1848
1849 static int mdc_llog_finish(struct obd_device *obd, int count)
1850 {
1851         struct llog_ctxt *ctxt;
1852         int rc = 0;
1853         ENTRY;
1854
1855         ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT);
1856         if (ctxt)
1857                 rc = llog_cleanup(ctxt);
1858
1859         ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
1860         if (ctxt)
1861                 rc = llog_cleanup(ctxt);
1862
1863         RETURN(rc);
1864 }
1865
1866 static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
1867 {
1868         struct lustre_cfg *lcfg = buf;
1869         struct lprocfs_static_vars lvars = { 0 };
1870         int rc = 0;
1871
1872         lprocfs_mdc_init_vars(&lvars);
1873         switch (lcfg->lcfg_command) {
1874         default:
1875                 rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars,
1876                                               lcfg, obd);
1877                 if (rc > 0)
1878                         rc = 0;
1879                 break;
1880         }
1881         return(rc);
1882 }
1883
1884
1885 /* get remote permission for current user on fid */
1886 int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
1887                         struct obd_capa *oc, __u32 suppgid,
1888                         struct ptlrpc_request **request)
1889 {
1890         struct ptlrpc_request  *req;
1891         int                    rc;
1892         ENTRY;
1893
1894         LASSERT(client_is_remote(exp));
1895
1896         *request = NULL;
1897         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
1898         if (req == NULL)
1899                 RETURN(-ENOMEM);
1900
1901         mdc_set_capa_size(req, &RMF_CAPA1, oc);
1902
1903         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
1904         if (rc) {
1905                 ptlrpc_request_free(req);
1906                 RETURN(rc);
1907         }
1908
1909         mdc_pack_body(req, fid, oc, OBD_MD_FLRMTPERM, 0, suppgid, 0);
1910
1911         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
1912                              sizeof(struct mdt_remote_perm));
1913
1914         ptlrpc_request_set_replen(req);
1915
1916         rc = ptlrpc_queue_wait(req);
1917         if (rc)
1918                 ptlrpc_req_finished(req);
1919         else
1920                 *request = req;
1921         RETURN(rc);
1922 }
1923
1924 static int mdc_interpret_renew_capa(const struct lu_env *env,
1925                                     struct ptlrpc_request *req, void *unused,
1926                                     int status)
1927 {
1928         struct obd_capa *oc = req->rq_async_args.pointer_arg[0];
1929         renew_capa_cb_t cb = req->rq_async_args.pointer_arg[1];
1930         struct mdt_body *body = NULL;
1931         struct lustre_capa *capa;
1932         ENTRY;
1933
1934         if (status)
1935                 GOTO(out, capa = ERR_PTR(status));
1936
1937         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1938         if (body == NULL)
1939                 GOTO(out, capa = ERR_PTR(-EFAULT));
1940
1941         if ((body->valid & OBD_MD_FLOSSCAPA) == 0)
1942                 GOTO(out, capa = ERR_PTR(-ENOENT));
1943
1944         capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA2);
1945         if (!capa)
1946                 GOTO(out, capa = ERR_PTR(-EFAULT));
1947         EXIT;
1948 out:
1949         cb(oc, capa);
1950         return 0;
1951 }
1952
1953 static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
1954                           renew_capa_cb_t cb)
1955 {
1956         struct ptlrpc_request *req;
1957         ENTRY;
1958
1959         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_GETATTR,
1960                                         LUSTRE_MDS_VERSION, MDS_GETATTR);
1961         if (req == NULL)
1962                 RETURN(-ENOMEM);
1963
1964         /* NB, OBD_MD_FLOSSCAPA is set here, but it doesn't necessarily mean the
1965          * capa to renew is oss capa.
1966          */
1967         mdc_pack_body(req, &oc->c_capa.lc_fid, oc, OBD_MD_FLOSSCAPA, 0, -1, 0);
1968         ptlrpc_request_set_replen(req);
1969
1970         req->rq_async_args.pointer_arg[0] = oc;
1971         req->rq_async_args.pointer_arg[1] = cb;
1972         req->rq_interpret_reply = mdc_interpret_renew_capa;
1973         ptlrpcd_add_req(req, PSCOPE_OTHER);
1974         RETURN(0);
1975 }
1976
1977 static int mdc_connect(const struct lu_env *env,
1978                        struct obd_export **exp,
1979                        struct obd_device *obd, struct obd_uuid *cluuid,
1980                        struct obd_connect_data *data,
1981                        void *localdata)
1982 {
1983         struct obd_import *imp = obd->u.cli.cl_import;
1984
1985         /* mds-mds import features */
1986         if (data && (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) {
1987                 cfs_spin_lock(&imp->imp_lock);
1988                 imp->imp_server_timeout = 1;
1989                 cfs_spin_unlock(&imp->imp_lock);
1990                 imp->imp_client->cli_request_portal = MDS_MDS_PORTAL;
1991                 CDEBUG(D_OTHER, "%s: Set 'mds' portal and timeout\n",
1992                        obd->obd_name);
1993         }
1994
1995         return client_connect_import(env, exp, obd, cluuid, data, NULL);
1996 }
1997
1998 struct obd_ops mdc_obd_ops = {
1999         .o_owner            = THIS_MODULE,
2000         .o_setup            = mdc_setup,
2001         .o_precleanup       = mdc_precleanup,
2002         .o_cleanup          = mdc_cleanup,
2003         .o_add_conn         = client_import_add_conn,
2004         .o_del_conn         = client_import_del_conn,
2005         .o_connect          = mdc_connect,
2006         .o_disconnect       = client_disconnect_export,
2007         .o_iocontrol        = mdc_iocontrol,
2008         .o_set_info_async   = mdc_set_info_async,
2009         .o_statfs           = mdc_statfs,
2010         .o_pin              = mdc_pin,
2011         .o_unpin            = mdc_unpin,
2012         .o_fid_init         = mdc_fid_init,
2013         .o_fid_fini         = mdc_fid_fini,
2014         .o_fid_alloc        = mdc_fid_alloc,
2015         .o_fid_delete       = mdc_fid_delete,
2016         .o_import_event     = mdc_import_event,
2017         .o_llog_init        = mdc_llog_init,
2018         .o_llog_finish      = mdc_llog_finish,
2019         .o_get_info         = mdc_get_info,
2020         .o_process_config   = mdc_process_config,
2021         .o_get_uuid         = mdc_get_uuid,
2022 };
2023
2024 struct md_ops mdc_md_ops = {
2025         .m_getstatus        = mdc_getstatus,
2026         .m_change_cbdata    = mdc_change_cbdata,
2027         .m_close            = mdc_close,
2028         .m_create           = mdc_create,
2029         .m_done_writing     = mdc_done_writing,
2030         .m_enqueue          = mdc_enqueue,
2031         .m_getattr          = mdc_getattr,
2032         .m_getattr_name     = mdc_getattr_name,
2033         .m_intent_lock      = mdc_intent_lock,
2034         .m_link             = mdc_link,
2035         .m_is_subdir        = mdc_is_subdir,
2036         .m_rename           = mdc_rename,
2037         .m_setattr          = mdc_setattr,
2038         .m_setxattr         = mdc_setxattr,
2039         .m_getxattr         = mdc_getxattr,
2040         .m_sync             = mdc_sync,
2041         .m_readpage         = mdc_readpage,
2042         .m_unlink           = mdc_unlink,
2043         .m_cancel_unused    = mdc_cancel_unused,
2044         .m_init_ea_size     = mdc_init_ea_size,
2045         .m_set_lock_data    = mdc_set_lock_data,
2046         .m_lock_match       = mdc_lock_match,
2047         .m_get_lustre_md    = mdc_get_lustre_md,
2048         .m_free_lustre_md   = mdc_free_lustre_md,
2049         .m_set_open_replay_data = mdc_set_open_replay_data,
2050         .m_clear_open_replay_data = mdc_clear_open_replay_data,
2051         .m_renew_capa       = mdc_renew_capa,
2052         .m_unpack_capa      = mdc_unpack_capa,
2053         .m_get_remote_perm  = mdc_get_remote_perm,
2054         .m_intent_getattr_async = mdc_intent_getattr_async,
2055         .m_revalidate_lock      = mdc_revalidate_lock
2056 };
2057
2058 int __init mdc_init(void)
2059 {
2060         int rc;
2061         struct lprocfs_static_vars lvars = { 0 };
2062         lprocfs_mdc_init_vars(&lvars);
2063
2064         cfs_request_module("lquota");
2065         quota_interface = PORTAL_SYMBOL_GET(mdc_quota_interface);
2066         init_obd_quota_ops(quota_interface, &mdc_obd_ops);
2067
2068         rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars,
2069                                  LUSTRE_MDC_NAME, NULL);
2070         if (rc && quota_interface)
2071                 PORTAL_SYMBOL_PUT(mdc_quota_interface);
2072
2073         RETURN(rc);
2074 }
2075
2076 #ifdef __KERNEL__
2077 static void /*__exit*/ mdc_exit(void)
2078 {
2079         if (quota_interface)
2080                 PORTAL_SYMBOL_PUT(mdc_quota_interface);
2081
2082         class_unregister_type(LUSTRE_MDC_NAME);
2083 }
2084
2085 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2086 MODULE_DESCRIPTION("Lustre Metadata Client");
2087 MODULE_LICENSE("GPL");
2088
2089 module_init(mdc_init);
2090 module_exit(mdc_exit);
2091 #endif