Whamcloud - gitweb
LU-16314 llite: Migrate LASSERTF %p to %px
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDC
33
34 #include <linux/init.h>
35 #include <linux/kthread.h>
36 #include <linux/module.h>
37 #include <linux/pagemap.h>
38 #include <linux/user_namespace.h>
39 #include <linux/utsname.h>
40 #include <linux/delay.h>
41 #include <linux/uidgid.h>
42 #include <linux/device.h>
43 #include <linux/xarray.h>
44
45 #include <lustre_errno.h>
46
47 #include <cl_object.h>
48 #include <llog_swab.h>
49 #include <lprocfs_status.h>
50 #include <lustre_acl.h>
51 #include <lustre_compat.h>
52 #include <lustre_fid.h>
53 #include <uapi/linux/lustre/lustre_ioctl.h>
54 #include <lustre_ioctl_old.h>
55 #include <lustre_kernelcomm.h>
56 #include <lustre_lmv.h>
57 #include <lustre_log.h>
58 #include <lustre_osc.h>
59 #include <lustre_swab.h>
60 #include <obd_class.h>
61
62 #include "mdc_internal.h"
63
64 #define REQUEST_MINOR 244
65
66 static int mdc_cleanup(struct obd_device *obd);
67
68 static inline int mdc_queue_wait(struct ptlrpc_request *req)
69 {
70         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
71         int rc;
72
73         /* obd_get_request_slot() ensures that this client has no more
74          * than cl_max_rpcs_in_flight RPCs simultaneously inf light
75          * against an MDT. */
76         rc = obd_get_request_slot(cli);
77         if (rc != 0)
78                 return rc;
79
80         rc = ptlrpc_queue_wait(req);
81         obd_put_request_slot(cli);
82
83         return rc;
84 }
85
86 /*
87  * Send MDS_GET_ROOT RPC to fetch root FID.
88  *
89  * If \a fileset is not NULL it should contain a subdirectory off
90  * the ROOT/ directory to be mounted on the client. Return the FID
91  * of the subdirectory to the client to mount onto its mountpoint.
92  *
93  * \param[in]   imp     MDC import
94  * \param[in]   fileset fileset name, which could be NULL
95  * \param[out]  rootfid root FID of this mountpoint
96  * \param[out]  pc      root capa will be unpacked and saved in this pointer
97  *
98  * \retval      0 on success, negative errno on failure
99  */
100 static int mdc_get_root(struct obd_export *exp, const char *fileset,
101                          struct lu_fid *rootfid)
102 {
103         struct ptlrpc_request   *req;
104         struct mdt_body         *body;
105         int                      rc;
106
107         ENTRY;
108
109         if (fileset && !(exp_connect_flags(exp) & OBD_CONNECT_SUBTREE))
110                 RETURN(-EOPNOTSUPP);
111
112         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
113                                 &RQF_MDS_GET_ROOT);
114         if (req == NULL)
115                 RETURN(-ENOMEM);
116
117         if (fileset != NULL)
118                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
119                                      strlen(fileset) + 1);
120         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_ROOT);
121         if (rc) {
122                 ptlrpc_request_free(req);
123                 RETURN(rc);
124         }
125         mdc_pack_body(&req->rq_pill, NULL, 0, 0, -1, 0);
126         if (fileset != NULL) {
127                 char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
128
129                 memcpy(name, fileset, strlen(fileset));
130         }
131         lustre_msg_add_flags(req->rq_reqmsg, LUSTRE_IMP_FULL);
132         req->rq_send_state = LUSTRE_IMP_FULL;
133
134         ptlrpc_request_set_replen(req);
135
136         rc = ptlrpc_queue_wait(req);
137         if (rc)
138                 GOTO(out, rc);
139
140         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
141         if (body == NULL)
142                 GOTO(out, rc = -EPROTO);
143
144         *rootfid = body->mbo_fid1;
145         CDEBUG(D_NET, "root fid="DFID", last_committed=%llu\n",
146                PFID(rootfid), lustre_msg_get_last_committed(req->rq_repmsg));
147         EXIT;
148 out:
149         ptlrpc_req_finished(req);
150
151         return rc;
152 }
153
154 /*
155  * This function now is known to always saying that it will receive 4 buffers
156  * from server. Even for cases when acl_size and md_size is zero, RPC header
157  * will contain 4 fields and RPC itself will contain zero size fields. This is
158  * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed
159  * and thus zero, it shrinks it, making zero size. The same story about
160  * md_size. And this is course of problem when client waits for smaller number
161  * of fields. This issue will be fixed later when client gets aware of RPC
162  * layouts.  --umka
163  */
164 static int mdc_getattr_common(struct obd_export *exp,
165                               struct ptlrpc_request *req,
166                               struct md_op_data *op_data)
167 {
168         struct req_capsule *pill = &req->rq_pill;
169         struct mdt_body    *body;
170         void               *eadata;
171         int                 rc;
172         ENTRY;
173
174         /* Request message already built. */
175         rc = ptlrpc_queue_wait(req);
176         if (rc != 0)
177                 RETURN(rc);
178
179         /* sanity check for the reply */
180         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
181         if (body == NULL)
182                 RETURN(-EPROTO);
183
184         CDEBUG(D_NET, "mode: %o\n", body->mbo_mode);
185
186         mdc_update_max_ea_from_body(exp, body);
187         if (body->mbo_eadatasize != 0) {
188                 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
189                                                       body->mbo_eadatasize);
190                 if (eadata == NULL)
191                         RETURN(-EPROTO);
192         }
193
194         /* If encryption context was returned by MDT, put it in op_data
195          * so that caller can set it on inode and save an extra getxattr.
196          */
197         if (op_data && op_data->op_valid & OBD_MD_ENCCTX &&
198             body->mbo_valid & OBD_MD_ENCCTX) {
199                 op_data->op_file_encctx =
200                         req_capsule_server_get(pill, &RMF_FILE_ENCCTX);
201                 op_data->op_file_encctx_size =
202                         req_capsule_get_size(pill, &RMF_FILE_ENCCTX,
203                                              RCL_SERVER);
204         }
205
206         RETURN(0);
207 }
208
209 static void mdc_reset_acl_req(struct ptlrpc_request *req)
210 {
211         spin_lock(&req->rq_early_free_lock);
212         sptlrpc_cli_free_repbuf(req);
213         req->rq_repbuf = NULL;
214         req->rq_repbuf_len = 0;
215         req->rq_repdata = NULL;
216         req->rq_reqdata_len = 0;
217         spin_unlock(&req->rq_early_free_lock);
218 }
219
220 static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
221                        struct ptlrpc_request **request)
222 {
223         struct ptlrpc_request *req;
224         struct obd_device *obd = class_exp2obd(exp);
225         struct obd_import *imp = class_exp2cliimp(exp);
226         __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
227         int rc;
228         ENTRY;
229
230         /* Single MDS without an LMV case */
231         if (op_data->op_flags & MF_GET_MDT_IDX) {
232                 op_data->op_mds = 0;
233                 RETURN(0);
234         }
235
236         *request = NULL;
237         req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR);
238         if (req == NULL)
239                 RETURN(-ENOMEM);
240
241         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
242         if (rc) {
243                 ptlrpc_request_free(req);
244                 RETURN(rc);
245         }
246
247         /* LU-15245: avoid deadlock with modifying RPCs on MDS_REQUEST_PORTAL */
248         req->rq_request_portal = MDS_READPAGE_PORTAL;
249
250 again:
251         mdc_pack_body(&req->rq_pill, &op_data->op_fid1, op_data->op_valid,
252                       op_data->op_mode, -1, 0);
253         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
254         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
255                              op_data->op_mode);
256         if (exp_connect_encrypt(exp) && op_data->op_valid & OBD_MD_ENCCTX)
257                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
258                                      RCL_SERVER,
259                                      obd->u.cli.cl_max_mds_easize);
260         else
261                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
262                                      RCL_SERVER, 0);
263         ptlrpc_request_set_replen(req);
264
265         rc = mdc_getattr_common(exp, req, op_data);
266         if (rc) {
267                 if (rc == -ERANGE) {
268                         acl_bufsize = min_t(__u32,
269                                             imp->imp_connect_data.ocd_max_easize,
270                                             XATTR_SIZE_MAX);
271                         mdc_reset_acl_req(req);
272                         goto again;
273                 }
274
275                 ptlrpc_req_finished(req);
276         } else {
277                 *request = req;
278         }
279
280         RETURN(rc);
281 }
282
283 static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
284                             struct ptlrpc_request **request)
285 {
286         struct ptlrpc_request *req;
287         struct obd_import *imp = class_exp2cliimp(exp);
288         __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
289         int rc;
290         ENTRY;
291
292         *request = NULL;
293         req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR_NAME);
294         if (req == NULL)
295                 RETURN(-ENOMEM);
296
297         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
298                              op_data->op_namelen + 1);
299
300         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME);
301         if (rc) {
302                 ptlrpc_request_free(req);
303                 RETURN(rc);
304         }
305
306         if (op_data->op_name) {
307                 char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
308                 LASSERT(strnlen(op_data->op_name, op_data->op_namelen) ==
309                                 op_data->op_namelen);
310                 memcpy(name, op_data->op_name, op_data->op_namelen);
311         }
312
313 again:
314         mdc_pack_body(&req->rq_pill, &op_data->op_fid1, op_data->op_valid,
315                       op_data->op_mode, op_data->op_suppgids[0], 0);
316         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
317                              op_data->op_mode);
318         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
319         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_SERVER, 0);
320         ptlrpc_request_set_replen(req);
321         if (op_data->op_bias & MDS_FID_OP) {
322                 struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
323                                                             &RMF_MDT_BODY);
324
325                 if (b) {
326                         b->mbo_valid |= OBD_MD_NAMEHASH;
327                         b->mbo_fid2 = op_data->op_fid2;
328                 }
329         }
330
331         rc = mdc_getattr_common(exp, req, NULL);
332         if (rc) {
333                 if (rc == -ERANGE) {
334                         acl_bufsize = min_t(__u32,
335                                             imp->imp_connect_data.ocd_max_easize,
336                                             XATTR_SIZE_MAX);
337                         mdc_reset_acl_req(req);
338                         goto again;
339                 }
340
341                 ptlrpc_req_finished(req);
342         } else {
343                 *request = req;
344         }
345
346         RETURN(rc);
347 }
348
349 static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
350                             const struct lu_fid *fid, int opcode, u64 valid,
351                             const char *xattr_name, const char *input,
352                             int input_size, int output_size, int flags,
353                             __u32 suppgid, struct ptlrpc_request **request)
354 {
355         struct ptlrpc_request *req;
356         struct sptlrpc_sepol *sepol;
357         int   xattr_namelen = 0;
358         char *tmp;
359         int   rc;
360         ENTRY;
361
362         *request = NULL;
363         req = ptlrpc_request_alloc(class_exp2cliimp(exp), fmt);
364         if (req == NULL)
365                 RETURN(-ENOMEM);
366
367         if (xattr_name) {
368                 xattr_namelen = strlen(xattr_name) + 1;
369                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370                                      xattr_namelen);
371         }
372         if (input_size)
373                 LASSERT(input);
374         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
375                              input_size);
376
377         /* get SELinux policy info if any */
378         sepol = sptlrpc_sepol_get(req);
379         if (IS_ERR(sepol))
380                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
381
382         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
383                              sptlrpc_sepol_size(sepol));
384
385         /* Flush local XATTR locks to get rid of a possible cancel RPC */
386         if (opcode == MDS_REINT && fid_is_sane(fid) &&
387             exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) {
388                 LIST_HEAD(cancels);
389                 int count;
390
391                 /* Without that packing would fail */
392                 if (input_size == 0)
393                         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
394                                              RCL_CLIENT, 0);
395
396                 count = mdc_resource_get_unused(exp, fid,
397                                                 &cancels, LCK_EX,
398                                                 MDS_INODELOCK_XATTR);
399
400                 rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
401                 if (rc)
402                         GOTO(err_put_sepol, rc);
403         } else {
404                 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
405                 if (rc)
406                         GOTO(err_put_sepol, rc);
407         }
408
409         if (opcode == MDS_REINT) {
410                 struct mdt_rec_setxattr *rec;
411
412                 BUILD_BUG_ON(sizeof(struct mdt_rec_setxattr) !=
413                              sizeof(struct mdt_rec_reint));
414                 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
415                 rec->sx_opcode = REINT_SETXATTR;
416                 rec->sx_fsuid  = from_kuid(&init_user_ns, current_fsuid());
417                 rec->sx_fsgid  = from_kgid(&init_user_ns, current_fsgid());
418                 rec->sx_cap = ll_capability_u32(current_cap());
419                 rec->sx_suppgid1 = suppgid;
420                 rec->sx_suppgid2 = -1;
421                 rec->sx_fid    = *fid;
422                 rec->sx_valid  = valid | OBD_MD_FLCTIME;
423                 rec->sx_time   = ktime_get_real_seconds();
424                 rec->sx_size   = output_size;
425                 rec->sx_flags  = flags;
426         } else {
427                 mdc_pack_body(&req->rq_pill, fid, valid, output_size,
428                               suppgid, flags);
429                 /* Avoid deadlock with modifying RPCs on MDS_REQUEST_PORTAL.
430                  * See LU-15245.
431                  */
432                 req->rq_request_portal = MDS_READPAGE_PORTAL;
433         }
434
435         if (xattr_name) {
436                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
437                 memcpy(tmp, xattr_name, xattr_namelen);
438         }
439         if (input_size) {
440                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
441                 memcpy(tmp, input, input_size);
442         }
443
444         mdc_file_sepol_pack(&req->rq_pill, sepol);
445         sptlrpc_sepol_put(sepol);
446
447         if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER))
448                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
449                                      RCL_SERVER, output_size);
450         ptlrpc_request_set_replen(req);
451
452         /* make rpc */
453         if (opcode == MDS_REINT)
454                 ptlrpc_get_mod_rpc_slot(req);
455
456         rc = ptlrpc_queue_wait(req);
457
458         if (opcode == MDS_REINT)
459                 ptlrpc_put_mod_rpc_slot(req);
460
461         if (rc)
462                 ptlrpc_req_finished(req);
463         else
464                 *request = req;
465         RETURN(rc);
466
467 err_put_sepol:
468         sptlrpc_sepol_put(sepol);
469 err_free_rq:
470         ptlrpc_request_free(req);
471         RETURN(rc);
472
473 }
474
475 static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
476                         u64 obd_md_valid, const char *name,
477                         const void *value, size_t value_size,
478                         unsigned int xattr_flags, u32 suppgid,
479                         struct ptlrpc_request **req)
480 {
481         LASSERT(obd_md_valid == OBD_MD_FLXATTR ||
482                 obd_md_valid == OBD_MD_FLXATTRRM);
483
484         return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
485                                 fid, MDS_REINT, obd_md_valid, name,
486                                 value, value_size, 0, xattr_flags, suppgid,
487                                 req);
488 }
489
490 static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
491                         u64 obd_md_valid, const char *name, size_t buf_size,
492                         struct ptlrpc_request **req)
493 {
494         struct mdt_body *body;
495         int rc;
496
497         LASSERT(obd_md_valid == OBD_MD_FLXATTR ||
498                 obd_md_valid == OBD_MD_FLXATTRLS);
499
500         /* Message below is checked in sanity-selinux test_20d
501          * and sanity-sec test_49
502          */
503         CDEBUG(D_INFO, "%s: get xattr '%s' for "DFID"\n",
504                exp->exp_obd->obd_name, name, PFID(fid));
505         rc = mdc_xattr_common(exp, &RQF_MDS_GETXATTR, fid, MDS_GETXATTR,
506                               obd_md_valid, name, NULL, 0, buf_size, 0, -1,
507                               req);
508         if (rc < 0)
509                 GOTO(out, rc);
510
511         body = req_capsule_server_get(&(*req)->rq_pill, &RMF_MDT_BODY);
512         if (body == NULL)
513                 GOTO(out, rc = -EPROTO);
514
515         /* only detect the xattr size */
516         if (buf_size == 0) {
517                 /* LU-11109: Older MDTs do not distinguish
518                  * between nonexistent xattrs and zero length
519                  * values in this case. Newer MDTs will return
520                  * -ENODATA or set OBD_MD_FLXATTR. */
521                 GOTO(out, rc = body->mbo_eadatasize);
522         }
523
524         if (body->mbo_eadatasize == 0) {
525                 /* LU-11109: Newer MDTs set OBD_MD_FLXATTR on
526                  * success so that we can distinguish between
527                  * zero length value and nonexistent xattr.
528                  *
529                  * If OBD_MD_FLXATTR is not set then we keep
530                  * the old behavior and return -ENODATA for
531                  * getxattr() when mbo_eadatasize is 0. But
532                  * -ENODATA only makes sense for getxattr()
533                  * and not for listxattr(). */
534                 if (body->mbo_valid & OBD_MD_FLXATTR)
535                         GOTO(out, rc = 0);
536                 else if (obd_md_valid == OBD_MD_FLXATTR)
537                         GOTO(out, rc = -ENODATA);
538                 else
539                         GOTO(out, rc = 0);
540         }
541
542         GOTO(out, rc = body->mbo_eadatasize);
543 out:
544         if (rc < 0) {
545                 ptlrpc_req_finished(*req);
546                 *req = NULL;
547         }
548
549         return rc;
550 }
551
552 static int mdc_get_lustre_md(struct obd_export *exp, struct req_capsule *pill,
553                              struct obd_export *dt_exp,
554                              struct obd_export *md_exp,
555                              struct lustre_md *md)
556 {
557         int rc;
558         ENTRY;
559
560         LASSERT(md);
561         memset(md, 0, sizeof(*md));
562
563         md->body = req_capsule_server_get(pill, &RMF_MDT_BODY);
564         LASSERT(md->body != NULL);
565
566         if (md->body->mbo_valid & OBD_MD_FLEASIZE) {
567                 if (!S_ISREG(md->body->mbo_mode)) {
568                         CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, should be a "
569                                "regular file, but is not\n");
570                         GOTO(out, rc = -EPROTO);
571                 }
572
573                 if (md->body->mbo_eadatasize == 0) {
574                         CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, "
575                                "but eadatasize 0\n");
576                         GOTO(out, rc = -EPROTO);
577                 }
578
579                 md->layout.lb_len = md->body->mbo_eadatasize;
580                 md->layout.lb_buf = req_capsule_server_sized_get(pill,
581                                                         &RMF_MDT_MD,
582                                                         md->layout.lb_len);
583                 if (md->layout.lb_buf == NULL)
584                         GOTO(out, rc = -EPROTO);
585         } else if (md->body->mbo_valid & OBD_MD_FLDIREA) {
586                 const union lmv_mds_md *lmv;
587                 size_t lmv_size;
588
589                 if (!S_ISDIR(md->body->mbo_mode)) {
590                         CDEBUG(D_INFO, "OBD_MD_FLDIREA set, should be a "
591                                "directory, but is not\n");
592                         GOTO(out, rc = -EPROTO);
593                 }
594
595                 if (md_exp->exp_obd->obd_type->typ_lu == &mdc_device_type) {
596                         CERROR("%s: no LMV, upgrading from old version?\n",
597                                md_exp->exp_obd->obd_name);
598
599                         GOTO(out_acl, rc = 0);
600                 }
601
602                 if (md->body->mbo_valid & OBD_MD_MEA) {
603                         lmv_size = md->body->mbo_eadatasize;
604                         if (lmv_size == 0) {
605                                 CDEBUG(D_INFO, "OBD_MD_FLDIREA is set, "
606                                        "but eadatasize 0\n");
607                                 RETURN(-EPROTO);
608                         }
609
610                         lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
611                                                            lmv_size);
612                         if (lmv == NULL)
613                                 GOTO(out, rc = -EPROTO);
614
615                         rc = md_stripe_object_create(md_exp, &md->lsm_obj,
616                                                      lmv, lmv_size);
617                         if (rc < 0)
618                                 GOTO(out, rc);
619                 }
620
621                 /* since 2.12.58 intent_getattr fetches default LMV */
622                 if (md->body->mbo_valid & OBD_MD_DEFAULT_MEA) {
623                         lmv_size = req_capsule_get_size(pill,
624                                                         &RMF_DEFAULT_MDT_MD,
625                                                         RCL_SERVER);
626                         lmv = req_capsule_server_sized_get(pill,
627                                                            &RMF_DEFAULT_MDT_MD,
628                                                            lmv_size);
629                         if (!lmv)
630                                 GOTO(out, rc = -EPROTO);
631
632                         rc = md_stripe_object_create(md_exp, &md->def_lsm_obj,
633                                                      lmv, lmv_size);
634                         if (rc < 0)
635                                 GOTO(out, rc);
636                 }
637         }
638         rc = 0;
639
640 out_acl:
641         if (md->body->mbo_valid & OBD_MD_FLACL) {
642                 /* for ACL, it's possible that FLACL is set but aclsize is zero.
643                  * only when aclsize != 0 there's an actual segment for ACL
644                  * in reply buffer.
645                  */
646                 rc = mdc_unpack_acl(pill, md);
647                 if (rc)
648                         GOTO(out, rc);
649         }
650
651         EXIT;
652 out:
653         if (rc) {
654                 lmd_clear_acl(md);
655                 md_put_lustre_md(md_exp, md);
656         }
657
658         return rc;
659 }
660
661 void mdc_replay_open(struct ptlrpc_request *req)
662 {
663         struct md_open_data *mod = req->rq_cb_data;
664         struct ptlrpc_request *close_req;
665         struct obd_client_handle *och;
666         struct lustre_handle old_open_handle = { };
667         struct mdt_body *body;
668         struct ldlm_reply *rep;
669         ENTRY;
670
671         if (mod == NULL) {
672                 DEBUG_REQ(D_ERROR, req,
673                           "cannot properly replay without open data");
674                 EXIT;
675                 return;
676         }
677
678         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
679         LASSERT(body != NULL);
680
681         rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
682         if (rep != NULL && rep->lock_policy_res2 != 0)
683                 DEBUG_REQ(D_ERROR, req, "Open request replay failed with %ld ",
684                         (long int)rep->lock_policy_res2);
685
686         spin_lock(&req->rq_lock);
687         och = mod->mod_och;
688         if (och && och->och_open_handle.cookie)
689                 req->rq_early_free_repbuf = 1;
690         else
691                 req->rq_early_free_repbuf = 0;
692         spin_unlock(&req->rq_lock);
693
694         if (req->rq_early_free_repbuf) {
695                 struct lustre_handle *file_open_handle;
696
697                 LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
698
699                 file_open_handle = &och->och_open_handle;
700                 CDEBUG(D_HA, "updating handle from %#llx to %#llx\n",
701                        file_open_handle->cookie, body->mbo_open_handle.cookie);
702                 old_open_handle = *file_open_handle;
703                 *file_open_handle = body->mbo_open_handle;
704         }
705
706         close_req = mod->mod_close_req;
707         if (close_req) {
708                 __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
709                 struct mdt_ioepoch *epoch;
710
711                 LASSERT(opc == MDS_CLOSE);
712                 epoch = req_capsule_client_get(&close_req->rq_pill,
713                                                &RMF_MDT_EPOCH);
714                 LASSERT(epoch);
715
716                 if (req->rq_early_free_repbuf)
717                         LASSERT(old_open_handle.cookie ==
718                                 epoch->mio_open_handle.cookie);
719
720                 DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
721                 epoch->mio_open_handle = body->mbo_open_handle;
722         }
723         EXIT;
724 }
725
726 void mdc_commit_open(struct ptlrpc_request *req)
727 {
728         struct md_open_data *mod = req->rq_cb_data;
729         if (mod == NULL)
730                 return;
731
732         /**
733          * No need to touch md_open_data::mod_och, it holds a reference on
734          * \var mod and will zero references to each other, \var mod will be
735          * freed after that when md_open_data::mod_och will put the reference.
736          */
737
738         /**
739          * Do not let open request to disappear as it still may be needed
740          * for close rpc to happen (it may happen on evict only, otherwise
741          * ptlrpc_request::rq_replay does not let mdc_commit_open() to be
742          * called), just mark this rpc as committed to distinguish these 2
743          * cases, see mdc_close() for details. The open request reference will
744          * be put along with freeing \var mod.
745          */
746         ptlrpc_request_addref(req);
747         spin_lock(&req->rq_lock);
748         req->rq_committed = 1;
749         spin_unlock(&req->rq_lock);
750         req->rq_cb_data = NULL;
751         obd_mod_put(mod);
752 }
753
754 int mdc_set_open_replay_data(struct obd_export *exp,
755                              struct obd_client_handle *och,
756                              struct lookup_intent *it)
757 {
758         struct md_open_data     *mod;
759         struct mdt_rec_create   *rec;
760         struct mdt_body         *body;
761         struct ptlrpc_request   *open_req = it->it_request;
762         struct obd_import       *imp = open_req->rq_import;
763         ENTRY;
764
765         if (!open_req->rq_replay)
766                 RETURN(0);
767
768         rec = req_capsule_client_get(&open_req->rq_pill, &RMF_REC_REINT);
769         body = req_capsule_server_get(&open_req->rq_pill, &RMF_MDT_BODY);
770         LASSERT(rec != NULL);
771         /* Incoming message in my byte order (it's been swabbed). */
772         /* Outgoing messages always in my byte order. */
773         LASSERT(body != NULL);
774
775         /* Only if the import is replayable, we set replay_open data */
776         if (och && imp->imp_replayable) {
777                 mod = obd_mod_alloc();
778                 if (mod == NULL) {
779                         DEBUG_REQ(D_ERROR, open_req,
780                                   "cannot allocate md_open_data");
781                         RETURN(0);
782                 }
783
784                 /**
785                  * Take a reference on \var mod, to be freed on mdc_close().
786                  * It protects \var mod from being freed on eviction (commit
787                  * callback is called despite rq_replay flag).
788                  * Another reference for \var och.
789                  */
790                 obd_mod_get(mod);
791                 obd_mod_get(mod);
792
793                 spin_lock(&open_req->rq_lock);
794                 och->och_mod = mod;
795                 mod->mod_och = och;
796                 mod->mod_is_create = it_disposition(it, DISP_OPEN_CREATE) ||
797                                      it_disposition(it, DISP_OPEN_STRIPE);
798                 mod->mod_open_req = open_req;
799                 open_req->rq_cb_data = mod;
800                 open_req->rq_commit_cb = mdc_commit_open;
801                 open_req->rq_early_free_repbuf = 1;
802                 spin_unlock(&open_req->rq_lock);
803         }
804
805         rec->cr_fid2 = body->mbo_fid1;
806         rec->cr_open_handle_old = body->mbo_open_handle;
807         open_req->rq_replay_cb = mdc_replay_open;
808         if (!fid_is_sane(&body->mbo_fid1)) {
809                 DEBUG_REQ(D_ERROR, open_req,
810                           "saving replay request with insane FID " DFID,
811                           PFID(&body->mbo_fid1));
812                 LBUG();
813         }
814
815         DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data");
816         RETURN(0);
817 }
818
819 static void mdc_free_open(struct md_open_data *mod)
820 {
821         int committed = 0;
822
823         if (mod->mod_is_create == 0 &&
824             imp_connect_disp_stripe(mod->mod_open_req->rq_import))
825                 committed = 1;
826
827         /**
828          * No reason to asssert here if the open request has
829          * rq_replay == 1. It means that mdc_close failed, and
830          * close request wasn`t sent. It is not fatal to client.
831          * The worst thing is eviction if the client gets open lock
832          **/
833
834         DEBUG_REQ(D_RPCTRACE, mod->mod_open_req,
835                   "free open request, rq_replay=%d",
836                   mod->mod_open_req->rq_replay);
837
838         ptlrpc_request_committed(mod->mod_open_req, committed);
839         if (mod->mod_close_req)
840                 ptlrpc_request_committed(mod->mod_close_req, committed);
841 }
842
843 static int mdc_clear_open_replay_data(struct obd_export *exp,
844                                       struct obd_client_handle *och)
845 {
846         struct md_open_data *mod = och->och_mod;
847         ENTRY;
848
849         /**
850          * It is possible to not have \var mod in a case of eviction between
851          * lookup and ll_file_open().
852          **/
853         if (mod == NULL)
854                 RETURN(0);
855
856         LASSERT(mod != LP_POISON);
857         LASSERT(mod->mod_open_req != NULL);
858
859         spin_lock(&mod->mod_open_req->rq_lock);
860         if (mod->mod_och)
861                 mod->mod_och->och_open_handle.cookie = 0;
862         mod->mod_open_req->rq_early_free_repbuf = 0;
863         spin_unlock(&mod->mod_open_req->rq_lock);
864         mdc_free_open(mod);
865
866         mod->mod_och = NULL;
867         och->och_mod = NULL;
868         obd_mod_put(mod);
869
870         RETURN(0);
871 }
872
873 static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
874                      struct md_open_data *mod, struct ptlrpc_request **request)
875 {
876         struct obd_device     *obd = class_exp2obd(exp);
877         struct ptlrpc_request *req;
878         struct req_format     *req_fmt;
879         size_t                 u32_count = 0;
880         int                    rc;
881         int                    saved_rc = 0;
882         ENTRY;
883
884         CDEBUG(D_INODE, "%s: "DFID" file closed with intent: %x\n",
885                exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
886                op_data->op_bias);
887
888         if (op_data->op_bias & MDS_CLOSE_INTENT) {
889                 req_fmt = &RQF_MDS_CLOSE_INTENT;
890                 if (op_data->op_bias & MDS_HSM_RELEASE) {
891                         /* allocate a FID for volatile file */
892                         rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2,
893                                            op_data);
894                         if (rc < 0) {
895                                 CERROR("%s: "DFID" allocating FID: rc = %d\n",
896                                        obd->obd_name, PFID(&op_data->op_fid1),
897                                        rc);
898                                 /* save the errcode and proceed to close */
899                                 saved_rc = rc;
900                         }
901                 }
902                 if (op_data->op_bias & MDS_CLOSE_RESYNC_DONE) {
903                         size_t count = op_data->op_data_size / sizeof(__u32);
904
905                         if (count > INLINE_RESYNC_ARRAY_SIZE)
906                                 u32_count = count;
907                 }
908         } else {
909                 req_fmt = &RQF_MDS_CLOSE;
910         }
911
912         *request = NULL;
913         if (CFS_FAIL_CHECK(OBD_FAIL_MDC_CLOSE))
914                 req = NULL;
915         else
916                 req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt);
917
918         /* Ensure that this close's handle is fixed up during replay. */
919         if (likely(mod != NULL)) {
920                 LASSERTF(mod->mod_open_req != NULL &&
921                          mod->mod_open_req->rq_type != LI_POISON,
922                          "POISONED open %px!\n", mod->mod_open_req);
923
924                 mod->mod_close_req = req;
925
926                 DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "matched open");
927                 /* We no longer want to preserve this open for replay even
928                  * though the open was committed. b=3632, b=3633 */
929                 spin_lock(&mod->mod_open_req->rq_lock);
930                 mod->mod_open_req->rq_replay = 0;
931                 spin_unlock(&mod->mod_open_req->rq_lock);
932         } else {
933                 CDEBUG(D_HA, "couldn't find open req; expecting close error\n");
934         }
935         if (req == NULL) {
936                 /**
937                  * TODO: repeat close after errors
938                  */
939                 CWARN("%s: close of FID "DFID" failed, file reference will be "
940                       "dropped when this client unmounts or is evicted\n",
941                       obd->obd_name, PFID(&op_data->op_fid1));
942                 GOTO(out, rc = -ENOMEM);
943         }
944
945         if (u32_count > 0)
946                 req_capsule_set_size(&req->rq_pill, &RMF_U32, RCL_CLIENT,
947                                      u32_count * sizeof(__u32));
948
949         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE);
950         if (rc) {
951                 ptlrpc_request_free(req);
952                 req = NULL;
953                 GOTO(out, rc);
954         }
955
956         /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
957          * portal whose threads are not taking any DLM locks and are therefore
958          * always progressing */
959         req->rq_request_portal = MDS_READPAGE_PORTAL;
960         ptlrpc_at_set_req_timeout(req);
961
962         if (!obd->u.cli.cl_lsom_update ||
963             !(exp_connect_flags2(exp) & OBD_CONNECT2_LSOM))
964                 op_data->op_xvalid &= ~(OP_XVALID_LAZYSIZE |
965                                         OP_XVALID_LAZYBLOCKS);
966
967         mdc_close_pack(&req->rq_pill, op_data);
968
969         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
970                              obd->u.cli.cl_default_mds_easize);
971
972         ptlrpc_request_set_replen(req);
973
974         ptlrpc_get_mod_rpc_slot(req);
975         rc = ptlrpc_queue_wait(req);
976         ptlrpc_put_mod_rpc_slot(req);
977
978         if (req->rq_repmsg == NULL) {
979                 CDEBUG(D_RPCTRACE, "request %p failed to send: rc = %d\n", req,
980                        req->rq_status);
981                 if (rc == 0)
982                         rc = req->rq_status ?: -EIO;
983         } else if (rc == 0 || rc == -EAGAIN) {
984                 struct mdt_body *body;
985
986                 rc = lustre_msg_get_status(req->rq_repmsg);
987                 if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
988                         DEBUG_REQ(D_ERROR, req,
989                                   "type = PTL_RPC_MSG_ERR: rc = %d", rc);
990                         if (rc > 0)
991                                 rc = -rc;
992                 }
993                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
994                 if (body == NULL)
995                         rc = -EPROTO;
996         } else if (rc == -ESTALE) {
997                 /**
998                  * it can be allowed error after 3633 if open was committed and
999                  * server failed before close was sent. Let's check if mod
1000                  * exists and return no error in that case
1001                  */
1002                 if (mod) {
1003                         DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc);
1004                         LASSERT(mod->mod_open_req != NULL);
1005                         if (mod->mod_open_req->rq_committed)
1006                                 rc = 0;
1007                 }
1008         }
1009
1010 out:
1011         if (mod) {
1012                 if (rc != 0)
1013                         mod->mod_close_req = NULL;
1014                 if (mod->mod_close_req)
1015                         ptlrpc_request_addref(mod->mod_close_req);
1016                 /* Since now, mod is accessed through open_req only,
1017                  * thus close req does not keep a reference on mod anymore. */
1018                 obd_mod_put(mod);
1019         }
1020         *request = req;
1021
1022         RETURN(rc < 0 ? rc : saved_rc);
1023 }
1024
1025 static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid,
1026                        u64 offset, struct page **pages, int npages,
1027                        struct ptlrpc_request **request)
1028 {
1029         struct ptlrpc_request   *req;
1030         struct ptlrpc_bulk_desc *desc;
1031         int                      i;
1032         int                      resends = 0;
1033         int                      rc;
1034         ENTRY;
1035
1036         *request = NULL;
1037
1038 restart_bulk:
1039         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
1040         if (req == NULL)
1041                 RETURN(-ENOMEM);
1042
1043         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE);
1044         if (rc) {
1045                 ptlrpc_request_free(req);
1046                 RETURN(rc);
1047         }
1048
1049         req->rq_request_portal = MDS_READPAGE_PORTAL;
1050         ptlrpc_at_set_req_timeout(req);
1051
1052         desc = ptlrpc_prep_bulk_imp(req, npages, 1,
1053                                     PTLRPC_BULK_PUT_SINK,
1054                                     MDS_BULK_PORTAL,
1055                                     &ptlrpc_bulk_kiov_pin_ops);
1056         if (desc == NULL) {
1057                 ptlrpc_req_finished(req);
1058                 RETURN(-ENOMEM);
1059         }
1060
1061         /* NB req now owns desc and will free it when it gets freed */
1062         for (i = 0; i < npages; i++)
1063                 desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
1064                                                  PAGE_SIZE);
1065
1066         mdc_readdir_pack(&req->rq_pill, offset, PAGE_SIZE * npages, fid);
1067
1068         ptlrpc_request_set_replen(req);
1069         rc = ptlrpc_queue_wait(req);
1070         if (rc) {
1071                 ptlrpc_req_finished(req);
1072                 if (rc != -ETIMEDOUT)
1073                         RETURN(rc);
1074
1075                 resends++;
1076                 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1077                         CERROR("%s: too many resend retries: rc = %d\n",
1078                                exp->exp_obd->obd_name, -EIO);
1079                         RETURN(-EIO);
1080                 }
1081
1082                 /* If a signal interrupts then the timeout returned will
1083                  * not be zero. In that case return -EINTR
1084                  */
1085                 if (msleep_interruptible(resends * 1000))
1086                         RETURN(-EINTR);
1087
1088                 goto restart_bulk;
1089         }
1090
1091         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
1092                                           req->rq_bulk->bd_nob_transferred);
1093         if (rc < 0) {
1094                 ptlrpc_req_finished(req);
1095                 RETURN(rc);
1096         }
1097
1098         if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) {
1099                 CERROR("%s: unexpected bytes transferred: %d (%ld expected)\n",
1100                        exp->exp_obd->obd_name, req->rq_bulk->bd_nob_transferred,
1101                        PAGE_SIZE * npages);
1102                 ptlrpc_req_finished(req);
1103                 RETURN(-EPROTO);
1104         }
1105
1106         *request = req;
1107         RETURN(0);
1108 }
1109
1110 static void mdc_release_page(struct page *page, int remove)
1111 {
1112         if (remove) {
1113                 lock_page(page);
1114                 if (likely(page->mapping != NULL))
1115                         cfs_delete_from_page_cache(page);
1116                 unlock_page(page);
1117         }
1118         put_page(page);
1119 }
1120
1121 static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
1122                                     __u64 *start, __u64 *end, int hash64)
1123 {
1124         /*
1125          * Complement of hash is used as an index so that
1126          * radix_tree_gang_lookup() can be used to find a page with starting
1127          * hash _smaller_ than one we are looking for.
1128          */
1129         unsigned long offset = hash_x_index(*hash, hash64);
1130         struct page *page;
1131         unsigned long flags;
1132         int found;
1133
1134         ll_xa_lock_irqsave(&mapping->i_pages, flags);
1135         found = radix_tree_gang_lookup(&mapping->page_tree,
1136                                        (void **)&page, offset, 1);
1137         if (found > 0 && !ll_xa_is_value(page)) {
1138                 struct lu_dirpage *dp;
1139
1140                 get_page(page);
1141                 ll_xa_unlock_irqrestore(&mapping->i_pages, flags);
1142                 /*
1143                  * In contrast to find_lock_page() we are sure that directory
1144                  * page cannot be truncated (while DLM lock is held) and,
1145                  * hence, can avoid restart.
1146                  *
1147                  * In fact, page cannot be locked here at all, because
1148                  * mdc_read_page_remote does synchronous io.
1149                  */
1150                 wait_on_page_locked(page);
1151                 if (PageUptodate(page)) {
1152                         dp = kmap(page);
1153                         if (BITS_PER_LONG == 32 && hash64) {
1154                                 *start = le64_to_cpu(dp->ldp_hash_start) >> 32;
1155                                 *end   = le64_to_cpu(dp->ldp_hash_end) >> 32;
1156                                 *hash  = *hash >> 32;
1157                         } else {
1158                                 *start = le64_to_cpu(dp->ldp_hash_start);
1159                                 *end   = le64_to_cpu(dp->ldp_hash_end);
1160                         }
1161                         if (unlikely(*start == 1 && *hash == 0))
1162                                 *hash = *start;
1163                         else
1164                                 LASSERTF(*start <= *hash, "start = %#llx"
1165                                          ",end = %#llx,hash = %#llx\n",
1166                                          *start, *end, *hash);
1167                         CDEBUG(D_VFSTRACE, "offset %lx [%#llx %#llx],"
1168                               " hash %#llx\n", offset, *start, *end, *hash);
1169                         if (*hash > *end) {
1170                                 kunmap(page);
1171                                 mdc_release_page(page, 0);
1172                                 page = NULL;
1173                         } else if (*end != *start && *hash == *end) {
1174                                 /*
1175                                  * upon hash collision, remove this page,
1176                                  * otherwise put page reference, and
1177                                  * mdc_read_page_remote() will issue RPC to
1178                                  * fetch the page we want.
1179                                  */
1180                                 kunmap(page);
1181                                 mdc_release_page(page,
1182                                     le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE);
1183                                 page = NULL;
1184                         }
1185                 } else {
1186                         put_page(page);
1187                         page = ERR_PTR(-EIO);
1188                 }
1189         } else {
1190                 ll_xa_unlock_irqrestore(&mapping->i_pages, flags);
1191                 page = NULL;
1192         }
1193         return page;
1194 }
1195
1196 /*
1197  * Adjust a set of pages, each page containing an array of lu_dirpages,
1198  * so that each page can be used as a single logical lu_dirpage.
1199  *
1200  * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
1201  * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
1202  * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
1203  * value is used as a cookie to request the next lu_dirpage in a
1204  * directory listing that spans multiple pages (two in this example):
1205  *   ________
1206  *  |        |
1207  * .|--------v-------   -----.
1208  * |s|e|f|p|ent|ent| ... |ent|
1209  * '--|--------------   -----'   Each PAGE contains a single
1210  *    '------.                   lu_dirpage.
1211  * .---------v-------   -----.
1212  * |s|e|f|p|ent| 0 | ... | 0 |
1213  * '-----------------   -----'
1214  *
1215  * However, on hosts where the native VM page size (PAGE_SIZE) is
1216  * larger than LU_PAGE_SIZE, a single host page may contain multiple
1217  * lu_dirpages. After reading the lu_dirpages from the MDS, the
1218  * ldp_hash_end of the first lu_dirpage refers to the one immediately
1219  * after it in the same PAGE (arrows simplified for brevity, but
1220  * in general e0==s1, e1==s2, etc.):
1221  *
1222  * .--------------------   -----.
1223  * |s0|e0|f0|p|ent|ent| ... |ent|
1224  * |---v----------------   -----|
1225  * |s1|e1|f1|p|ent|ent| ... |ent|
1226  * |---v----------------   -----|  Here, each PAGE contains
1227  *             ...                 multiple lu_dirpages.
1228  * |---v----------------   -----|
1229  * |s'|e'|f'|p|ent|ent| ... |ent|
1230  * '---|----------------   -----'
1231  *     v
1232  * .----------------------------.
1233  * |        next PAGE           |
1234  *
1235  * This structure is transformed into a single logical lu_dirpage as follows:
1236  *
1237  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
1238  *   labeled 'next PAGE'.
1239  *
1240  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
1241  *   a hash collision with the next page exists.
1242  *
1243  * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
1244  *   to the first entry of the next lu_dirpage.
1245  */
1246 #if PAGE_SIZE > LU_PAGE_SIZE
1247 static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs)
1248 {
1249         int i;
1250
1251         for (i = 0; i < cfs_pgs; i++) {
1252                 struct lu_dirpage *dp = kmap(pages[i]);
1253                 struct lu_dirpage *first = dp;
1254                 struct lu_dirent *end_dirent = NULL;
1255                 struct lu_dirent *ent;
1256                 __u64 hash_end = dp->ldp_hash_end;
1257                 __u32 flags = dp->ldp_flags;
1258
1259                 while (--lu_pgs > 0) {
1260                         ent = lu_dirent_start(dp);
1261                         for (end_dirent = ent; ent != NULL;
1262                              end_dirent = ent, ent = lu_dirent_next(ent));
1263
1264                         /* Advance dp to next lu_dirpage. */
1265                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
1266
1267                         /* Check if we've reached the end of the PAGE. */
1268                         if (!((unsigned long)dp & ~PAGE_MASK))
1269                                 break;
1270
1271                         /* Save the hash and flags of this lu_dirpage. */
1272                         hash_end = dp->ldp_hash_end;
1273                         flags = dp->ldp_flags;
1274
1275                         /* Check if lu_dirpage contains no entries. */
1276                         if (end_dirent == NULL)
1277                                 break;
1278
1279                         /* Enlarge the end entry lde_reclen from 0 to
1280                          * first entry of next lu_dirpage. */
1281                         LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
1282                         end_dirent->lde_reclen =
1283                                 cpu_to_le16((char *)(dp->ldp_entries) -
1284                                             (char *)end_dirent);
1285                 }
1286
1287                 first->ldp_hash_end = hash_end;
1288                 first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
1289                 first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
1290
1291                 kunmap(pages[i]);
1292         }
1293         LASSERTF(lu_pgs == 0, "left = %d\n", lu_pgs);
1294 }
1295 #else
1296 #define mdc_adjust_dirpages(pages, cfs_pgs, lu_pgs) do {} while (0)
1297 #endif  /* PAGE_SIZE > LU_PAGE_SIZE */
1298
1299 /* parameters for readdir page */
1300 struct readpage_param {
1301         struct md_op_data       *rp_mod;
1302         __u64                   rp_off;
1303         int                     rp_hash64;
1304         struct obd_export       *rp_exp;
1305 };
1306
1307 /**
1308  * Read pages from server.
1309  *
1310  * Page in MDS_READPAGE RPC is packed in LU_PAGE_SIZE, and each page contains
1311  * a header lu_dirpage which describes the start/end hash, and whether this
1312  * page is empty (contains no dir entry) or hash collide with next page.
1313  * After client receives reply, several pages will be integrated into dir page
1314  * in PAGE_SIZE (if PAGE_SIZE greater than LU_PAGE_SIZE), and the
1315  * lu_dirpage for this integrated page will be adjusted.
1316  **/
1317 static int ll_mdc_read_page_remote(void *data, struct page *page0)
1318 {
1319         struct readpage_param *rp = data;
1320         struct page **page_pool;
1321         struct page *page;
1322         struct lu_dirpage *dp;
1323         struct md_op_data *op_data = rp->rp_mod;
1324         struct ptlrpc_request *req;
1325         int max_pages;
1326         struct inode *inode;
1327         struct lu_fid *fid;
1328         int rd_pgs = 0; /* number of pages actually read */
1329         int npages;
1330         int i;
1331         int rc;
1332         ENTRY;
1333
1334         max_pages = rp->rp_exp->exp_obd->u.cli.cl_max_pages_per_rpc;
1335         inode = op_data->op_data;
1336         fid = &op_data->op_fid1;
1337         LASSERT(inode != NULL);
1338
1339         OBD_ALLOC_PTR_ARRAY_LARGE(page_pool, max_pages);
1340         if (page_pool != NULL) {
1341                 page_pool[0] = page0;
1342         } else {
1343                 page_pool = &page0;
1344                 max_pages = 1;
1345         }
1346
1347         for (npages = 1; npages < max_pages; npages++) {
1348                 page = page_cache_alloc(inode->i_mapping);
1349                 if (page == NULL)
1350                         break;
1351                 page_pool[npages] = page;
1352         }
1353
1354         rc = mdc_getpage(rp->rp_exp, fid, rp->rp_off, page_pool, npages, &req);
1355         if (rc < 0) {
1356                 /* page0 is special, which was added into page cache early */
1357                 cfs_delete_from_page_cache(page0);
1358         } else {
1359                 int lu_pgs;
1360
1361                 rd_pgs = (req->rq_bulk->bd_nob_transferred + PAGE_SIZE - 1) >>
1362                         PAGE_SHIFT;
1363                 lu_pgs = req->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
1364                 LASSERT(!(req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
1365
1366                 CDEBUG(D_INODE, "read %d(%d) pages\n", rd_pgs, lu_pgs);
1367
1368                 mdc_adjust_dirpages(page_pool, rd_pgs, lu_pgs);
1369
1370                 SetPageUptodate(page0);
1371         }
1372         unlock_page(page0);
1373
1374         ptlrpc_req_finished(req);
1375         CDEBUG(D_CACHE, "read %d/%d pages\n", rd_pgs, npages);
1376         for (i = 1; i < npages; i++) {
1377                 unsigned long   offset;
1378                 __u64           hash;
1379                 int ret;
1380
1381                 page = page_pool[i];
1382
1383                 if (rc < 0 || i >= rd_pgs) {
1384                         put_page(page);
1385                         continue;
1386                 }
1387
1388                 SetPageUptodate(page);
1389
1390                 dp = kmap(page);
1391                 hash = le64_to_cpu(dp->ldp_hash_start);
1392                 kunmap(page);
1393
1394                 offset = hash_x_index(hash, rp->rp_hash64);
1395
1396                 prefetchw(&page->flags);
1397                 ret = add_to_page_cache_lru(page, inode->i_mapping, offset,
1398                                             GFP_KERNEL);
1399                 if (ret == 0)
1400                         unlock_page(page);
1401                 else
1402                         CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:"
1403                                " rc = %d\n", offset, ret);
1404                 put_page(page);
1405         }
1406
1407         if (page_pool != &page0)
1408                 OBD_FREE_PTR_ARRAY_LARGE(page_pool, max_pages);
1409
1410         RETURN(rc);
1411 }
1412
1413 #ifdef HAVE_READ_CACHE_PAGE_WANTS_FILE
1414 static inline int mdc_read_folio_remote(struct file *file, struct folio *folio)
1415 {
1416         return ll_mdc_read_page_remote(file->private_data,
1417                                        folio_page(folio, 0));
1418 }
1419 #else
1420 #define mdc_read_folio_remote   ll_mdc_read_page_remote
1421 #endif
1422
1423 /**
1424  * Read dir page from cache first, if it can not find it, read it from
1425  * server and add into the cache.
1426  *
1427  * \param[in] exp       MDC export
1428  * \param[in] op_data   client MD stack parameters, transfering parameters
1429  *                      between different layers on client MD stack.
1430  * \param[in] mrinfo    callback required for ldlm lock enqueue during
1431  *                      read page
1432  * \param[in] hash_offset the hash offset of the page to be read
1433  * \param[in] ppage     the page to be read
1434  *
1435  * retval               = 0 get the page successfully
1436  *                      errno(<0) get the page failed
1437  */
1438 static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
1439                          struct md_readdir_info *mrinfo, __u64 hash_offset,
1440                          struct page **ppage)
1441 {
1442         struct lookup_intent    it = { .it_op = IT_READDIR };
1443         struct page             *page;
1444         struct inode            *dir = op_data->op_data;
1445         struct address_space    *mapping;
1446         struct lu_dirpage       *dp;
1447         __u64                   start = 0;
1448         __u64                   end = 0;
1449         struct lustre_handle    lockh;
1450         struct ptlrpc_request   *enq_req = NULL;
1451         struct readpage_param   rp_param;
1452         int rc;
1453
1454         ENTRY;
1455
1456         *ppage = NULL;
1457
1458         LASSERT(dir != NULL);
1459         mapping = dir->i_mapping;
1460
1461         rc = mdc_intent_lock(exp, op_data, &it, &enq_req,
1462                              mrinfo->mr_blocking_ast, 0);
1463         if (enq_req != NULL)
1464                 ptlrpc_req_finished(enq_req);
1465
1466         if (rc < 0) {
1467                 CERROR("%s: "DFID" lock enqueue fails: rc = %d\n",
1468                        exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rc);
1469                 RETURN(rc);
1470         }
1471
1472         rc = 0;
1473         lockh.cookie = it.it_lock_handle;
1474         mdc_set_lock_data(exp, &lockh, dir, NULL);
1475
1476         rp_param.rp_off = hash_offset;
1477         rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64;
1478         page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end,
1479                                rp_param.rp_hash64);
1480         if (IS_ERR(page)) {
1481                 CERROR("%s: dir page locate: "DFID" at %llu: rc %ld\n",
1482                        exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
1483                        rp_param.rp_off, PTR_ERR(page));
1484                 GOTO(out_unlock, rc = PTR_ERR(page));
1485         } else if (page != NULL) {
1486                 /*
1487                  * XXX nikita: not entirely correct handling of a corner case:
1488                  * suppose hash chain of entries with hash value HASH crosses
1489                  * border between pages P0 and P1. First both P0 and P1 are
1490                  * cached, seekdir() is called for some entry from the P0 part
1491                  * of the chain. Later P0 goes out of cache. telldir(HASH)
1492                  * happens and finds P1, as it starts with matching hash
1493                  * value. Remaining entries from P0 part of the chain are
1494                  * skipped. (Is that really a bug?)
1495                  *
1496                  * Possible solutions: 0. don't cache P1 is such case, handle
1497                  * it as an "overflow" page. 1. invalidate all pages at
1498                  * once. 2. use HASH|1 as an index for P1.
1499                  */
1500                 GOTO(hash_collision, page);
1501         }
1502
1503         rp_param.rp_exp = exp;
1504         rp_param.rp_mod = op_data;
1505         page = ll_read_cache_page(mapping,
1506                                   hash_x_index(rp_param.rp_off,
1507                                                rp_param.rp_hash64),
1508                                   mdc_read_folio_remote, &rp_param);
1509         if (IS_ERR(page)) {
1510                 CDEBUG(D_INFO, "%s: read cache page: "DFID" at %llu: %ld\n",
1511                        exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
1512                        rp_param.rp_off, PTR_ERR(page));
1513                 GOTO(out_unlock, rc = PTR_ERR(page));
1514         }
1515
1516         wait_on_page_locked(page);
1517         (void)kmap(page);
1518         if (!PageUptodate(page)) {
1519                 CERROR("%s: page not updated: "DFID" at %llu: rc %d\n",
1520                        exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
1521                        rp_param.rp_off, -5);
1522                 goto fail;
1523         }
1524         if (!PageChecked(page))
1525                 SetPageChecked(page);
1526         if (PageError(page)) {
1527                 CERROR("%s: page error: "DFID" at %llu: rc %d\n",
1528                        exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
1529                        rp_param.rp_off, -5);
1530                 goto fail;
1531         }
1532
1533 hash_collision:
1534         dp = page_address(page);
1535         if (BITS_PER_LONG == 32 && rp_param.rp_hash64) {
1536                 start = le64_to_cpu(dp->ldp_hash_start) >> 32;
1537                 end   = le64_to_cpu(dp->ldp_hash_end) >> 32;
1538                 rp_param.rp_off = hash_offset >> 32;
1539         } else {
1540                 start = le64_to_cpu(dp->ldp_hash_start);
1541                 end   = le64_to_cpu(dp->ldp_hash_end);
1542                 rp_param.rp_off = hash_offset;
1543         }
1544         if (end == start) {
1545                 LASSERT(start == rp_param.rp_off);
1546                 CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end);
1547 #if BITS_PER_LONG == 32
1548                 CWARN("Real page-wide hash collision at [%llu %llu] with "
1549                       "hash %llu\n", le64_to_cpu(dp->ldp_hash_start),
1550                       le64_to_cpu(dp->ldp_hash_end), hash_offset);
1551 #endif
1552
1553                 /*
1554                  * Fetch whole overflow chain...
1555                  *
1556                  * XXX not yet.
1557                  */
1558                 goto fail;
1559         }
1560         *ppage = page;
1561 out_unlock:
1562         ldlm_lock_decref(&lockh, it.it_lock_mode);
1563         return rc;
1564 fail:
1565         kunmap(page);
1566         mdc_release_page(page, 1);
1567         rc = -EIO;
1568         goto out_unlock;
1569 }
1570
1571 static int mdc_statfs_interpret(const struct lu_env *env,
1572                                 struct ptlrpc_request *req, void *args, int rc)
1573 {
1574         struct obd_info *oinfo = args;
1575         struct obd_statfs *osfs;
1576
1577         if (!rc) {
1578                 osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1579                 if (!osfs)
1580                         return -EPROTO;
1581
1582                 oinfo->oi_osfs = osfs;
1583
1584                 CDEBUG(D_CACHE, "blocks=%llu free=%llu avail=%llu "
1585                        "objects=%llu free=%llu state=%x\n",
1586                         osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
1587                         osfs->os_files, osfs->os_ffree, osfs->os_state);
1588         }
1589
1590         oinfo->oi_cb_up(oinfo, rc);
1591
1592         return rc;
1593 }
1594
1595 static int mdc_statfs_async(struct obd_export *exp,
1596                             struct obd_info *oinfo, time64_t max_age,
1597                             struct ptlrpc_request_set *unused)
1598 {
1599         struct ptlrpc_request *req;
1600         struct obd_info *aa;
1601
1602         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_STATFS,
1603                                         LUSTRE_MDS_VERSION, MDS_STATFS);
1604         if (req == NULL)
1605                 return -ENOMEM;
1606
1607         ptlrpc_request_set_replen(req);
1608         req->rq_interpret_reply = mdc_statfs_interpret;
1609
1610         aa = ptlrpc_req_async_args(aa, req);
1611         *aa = *oinfo;
1612
1613         ptlrpcd_add_req(req);
1614
1615         return 0;
1616 }
1617
1618 static int mdc_statfs(const struct lu_env *env,
1619                       struct obd_export *exp, struct obd_statfs *osfs,
1620                       time64_t max_age, __u32 flags)
1621 {
1622         struct obd_device *obd = class_exp2obd(exp);
1623         struct req_format *fmt;
1624         struct ptlrpc_request *req;
1625         struct obd_statfs *msfs;
1626         struct obd_import *imp, *imp0;
1627         int rc;
1628         ENTRY;
1629
1630         /*
1631          * Since the request might also come from lprocfs, so we need
1632          * sync this with client_disconnect_export Bug15684
1633          */
1634         with_imp_locked(obd, imp0, rc)
1635                 imp = class_import_get(imp0);
1636         if (rc)
1637                 RETURN(rc);
1638
1639         fmt = &RQF_MDS_STATFS;
1640         if ((exp_connect_flags2(exp) & OBD_CONNECT2_SUM_STATFS) &&
1641             (flags & OBD_STATFS_SUM))
1642                 fmt = &RQF_MDS_STATFS_NEW;
1643         req = ptlrpc_request_alloc_pack(imp, fmt, LUSTRE_MDS_VERSION,
1644                                         MDS_STATFS);
1645         if (req == NULL)
1646                 GOTO(output, rc = -ENOMEM);
1647         req->rq_allow_intr = 1;
1648
1649         if ((flags & OBD_STATFS_SUM) &&
1650             (exp_connect_flags2(exp) & OBD_CONNECT2_SUM_STATFS)) {
1651                 /* request aggregated states */
1652                 struct mdt_body *body;
1653
1654                 body = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY);
1655                 if (body == NULL)
1656                         GOTO(out, rc = -EPROTO);
1657                 body->mbo_valid = OBD_MD_FLAGSTATFS;
1658         }
1659
1660         ptlrpc_request_set_replen(req);
1661
1662         if (flags & OBD_STATFS_NODELAY) {
1663                 /* procfs requests not want stay in wait for avoid deadlock */
1664                 req->rq_no_resend = 1;
1665                 req->rq_no_delay = 1;
1666         }
1667
1668         rc = ptlrpc_queue_wait(req);
1669         if (rc) {
1670                 /* check connection error first */
1671                 if (imp->imp_connect_error)
1672                         rc = imp->imp_connect_error;
1673                 GOTO(out, rc);
1674         }
1675
1676         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1677         if (msfs == NULL)
1678                 GOTO(out, rc = -EPROTO);
1679
1680         *osfs = *msfs;
1681         EXIT;
1682 out:
1683         ptlrpc_req_finished(req);
1684 output:
1685         class_import_put(imp);
1686         return rc;
1687 }
1688
1689 static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
1690 {
1691         __u32 keylen, vallen;
1692         void *key;
1693         int rc;
1694
1695         if (gf->gf_pathlen < 2)
1696                 RETURN(-EOVERFLOW);
1697
1698         /* Key is KEY_FID2PATH + getinfo_fid2path description */
1699         keylen = round_up(sizeof(KEY_FID2PATH) + sizeof(*gf) +
1700                                 sizeof(struct lu_fid), 8);
1701         OBD_ALLOC(key, keylen);
1702         if (key == NULL)
1703                 RETURN(-ENOMEM);
1704         memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH));
1705         memcpy(key + round_up(sizeof(KEY_FID2PATH), 8), gf, sizeof(*gf));
1706         memcpy(key + round_up(sizeof(KEY_FID2PATH), 8) + sizeof(*gf),
1707                gf->gf_u.gf_root_fid, sizeof(struct lu_fid));
1708         CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n",
1709                PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno);
1710
1711         if (!fid_is_sane(&gf->gf_fid))
1712                 GOTO(out, rc = -EINVAL);
1713
1714         /* Val is struct getinfo_fid2path result plus path */
1715         vallen = sizeof(*gf) + gf->gf_pathlen;
1716
1717         rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf);
1718         if (rc != 0 && rc != -EREMOTE)
1719                 GOTO(out, rc);
1720
1721         if (vallen <= sizeof(*gf))
1722                 GOTO(out, rc = -EPROTO);
1723         if (vallen > sizeof(*gf) + gf->gf_pathlen)
1724                 GOTO(out, rc = -EOVERFLOW);
1725
1726         CDEBUG(D_IOCTL, "path got "DFID" from %llu #%d: %.*s\n",
1727                PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno,
1728                /* only log the first 512 characters of the path */
1729                512, gf->gf_u.gf_path);
1730
1731 out:
1732         OBD_FREE(key, keylen);
1733         return rc;
1734 }
1735
1736 static int mdc_ioc_hsm_progress(struct obd_export *exp,
1737                                 struct hsm_progress_kernel *hpk)
1738 {
1739         struct obd_import               *imp = class_exp2cliimp(exp);
1740         struct hsm_progress_kernel      *req_hpk;
1741         struct ptlrpc_request           *req;
1742         int                              rc;
1743         ENTRY;
1744
1745         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_PROGRESS,
1746                                         LUSTRE_MDS_VERSION, MDS_HSM_PROGRESS);
1747         if (req == NULL)
1748                 GOTO(out, rc = -ENOMEM);
1749
1750         mdc_pack_body(&req->rq_pill, NULL, 0, 0, -1, 0);
1751
1752         /* Copy hsm_progress struct */
1753         req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
1754         if (req_hpk == NULL)
1755                 GOTO(out, rc = -EPROTO);
1756
1757         *req_hpk = *hpk;
1758         req_hpk->hpk_errval = lustre_errno_hton(hpk->hpk_errval);
1759
1760         ptlrpc_request_set_replen(req);
1761
1762         ptlrpc_get_mod_rpc_slot(req);
1763         rc = ptlrpc_queue_wait(req);
1764         ptlrpc_put_mod_rpc_slot(req);
1765
1766         GOTO(out, rc);
1767 out:
1768         ptlrpc_req_finished(req);
1769         return rc;
1770 }
1771
1772 /**
1773  * Send hsm_ct_register to MDS
1774  *
1775  * \param[in]   imp             import
1776  * \param[in]   archive_count   if in bitmap format, it is the bitmap,
1777  *                              else it is the count of archive_ids
1778  * \param[in]   archives        if in bitmap format, it is NULL,
1779  *                              else it is archive_id lists
1780  */
1781 static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archive_count,
1782                                    __u32 *archives)
1783 {
1784         struct ptlrpc_request *req;
1785         __u32 *archive_array;
1786         size_t archives_size;
1787         int rc;
1788         ENTRY;
1789
1790         req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_CT_REGISTER);
1791         if (req == NULL)
1792                 RETURN(-ENOMEM);
1793
1794         if (archives != NULL)
1795                 archives_size = sizeof(*archive_array) * archive_count;
1796         else
1797                 archives_size = sizeof(archive_count);
1798
1799         req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_ARCHIVE,
1800                              RCL_CLIENT, archives_size);
1801
1802         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_CT_REGISTER);
1803         if (rc) {
1804                 ptlrpc_request_free(req);
1805                 RETURN(-ENOMEM);
1806         }
1807
1808         mdc_pack_body(&req->rq_pill, NULL, 0, 0, -1, 0);
1809
1810         archive_array = req_capsule_client_get(&req->rq_pill,
1811                                                &RMF_MDS_HSM_ARCHIVE);
1812         if (archive_array == NULL)
1813                 GOTO(out, rc = -EPROTO);
1814
1815         if (archives != NULL)
1816                 memcpy(archive_array, archives, archives_size);
1817         else
1818                 *archive_array = archive_count;
1819
1820         ptlrpc_request_set_replen(req);
1821
1822         rc = mdc_queue_wait(req);
1823         GOTO(out, rc);
1824 out:
1825         ptlrpc_req_finished(req);
1826         return rc;
1827 }
1828
1829 static int mdc_ioc_hsm_current_action(struct obd_export *exp,
1830                                       struct md_op_data *op_data)
1831 {
1832         struct hsm_current_action       *hca = op_data->op_data;
1833         struct hsm_current_action       *req_hca;
1834         struct ptlrpc_request           *req;
1835         int                              rc;
1836         ENTRY;
1837
1838         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1839                                    &RQF_MDS_HSM_ACTION);
1840         if (req == NULL)
1841                 RETURN(-ENOMEM);
1842
1843         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION);
1844         if (rc) {
1845                 ptlrpc_request_free(req);
1846                 RETURN(rc);
1847         }
1848
1849         mdc_pack_body(&req->rq_pill, &op_data->op_fid1, 0, 0,
1850                       op_data->op_suppgids[0], 0);
1851
1852         ptlrpc_request_set_replen(req);
1853
1854         rc = mdc_queue_wait(req);
1855         if (rc)
1856                 GOTO(out, rc);
1857
1858         req_hca = req_capsule_server_get(&req->rq_pill,
1859                                          &RMF_MDS_HSM_CURRENT_ACTION);
1860         if (req_hca == NULL)
1861                 GOTO(out, rc = -EPROTO);
1862
1863         *hca = *req_hca;
1864
1865         EXIT;
1866 out:
1867         ptlrpc_req_finished(req);
1868         return rc;
1869 }
1870
1871 static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
1872 {
1873         struct ptlrpc_request   *req;
1874         int                      rc;
1875         ENTRY;
1876
1877         req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_UNREGISTER,
1878                                         LUSTRE_MDS_VERSION,
1879                                         MDS_HSM_CT_UNREGISTER);
1880         if (req == NULL)
1881                 GOTO(out, rc = -ENOMEM);
1882
1883         mdc_pack_body(&req->rq_pill, NULL, 0, 0, -1, 0);
1884
1885         ptlrpc_request_set_replen(req);
1886
1887         rc = mdc_queue_wait(req);
1888         GOTO(out, rc);
1889 out:
1890         ptlrpc_req_finished(req);
1891         return rc;
1892 }
1893
1894 static int mdc_ioc_hsm_state_get(struct obd_export *exp,
1895                                  struct md_op_data *op_data)
1896 {
1897         struct hsm_user_state   *hus = op_data->op_data;
1898         struct hsm_user_state   *req_hus;
1899         struct ptlrpc_request   *req;
1900         int                      rc;
1901         ENTRY;
1902
1903         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1904                                    &RQF_MDS_HSM_STATE_GET);
1905         if (req == NULL)
1906                 RETURN(-ENOMEM);
1907
1908         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET);
1909         if (rc != 0) {
1910                 ptlrpc_request_free(req);
1911                 RETURN(rc);
1912         }
1913
1914         mdc_pack_body(&req->rq_pill, &op_data->op_fid1, 0, 0,
1915                       op_data->op_suppgids[0], 0);
1916
1917         ptlrpc_request_set_replen(req);
1918
1919         rc = mdc_queue_wait(req);
1920         if (rc)
1921                 GOTO(out, rc);
1922
1923         req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE);
1924         if (req_hus == NULL)
1925                 GOTO(out, rc = -EPROTO);
1926
1927         *hus = *req_hus;
1928
1929         EXIT;
1930 out:
1931         ptlrpc_req_finished(req);
1932         return rc;
1933 }
1934
1935 static int mdc_ioc_hsm_state_set(struct obd_export *exp,
1936                                  struct md_op_data *op_data)
1937 {
1938         struct hsm_state_set    *hss = op_data->op_data;
1939         struct hsm_state_set    *req_hss;
1940         struct ptlrpc_request   *req;
1941         int                      rc;
1942         ENTRY;
1943
1944         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
1945                                    &RQF_MDS_HSM_STATE_SET);
1946         if (req == NULL)
1947                 RETURN(-ENOMEM);
1948
1949         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET);
1950         if (rc) {
1951                 ptlrpc_request_free(req);
1952                 RETURN(rc);
1953         }
1954
1955         mdc_pack_body(&req->rq_pill, &op_data->op_fid1, 0, 0,
1956                       op_data->op_suppgids[0], 0);
1957
1958         /* Copy states */
1959         req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
1960         if (req_hss == NULL)
1961                 GOTO(out, rc = -EPROTO);
1962         *req_hss = *hss;
1963
1964         ptlrpc_request_set_replen(req);
1965
1966         ptlrpc_get_mod_rpc_slot(req);
1967         rc = ptlrpc_queue_wait(req);
1968         ptlrpc_put_mod_rpc_slot(req);
1969
1970         GOTO(out, rc);
1971 out:
1972         ptlrpc_req_finished(req);
1973         return rc;
1974 }
1975
1976 /* For RESTORE and RELEASE the mdt will take EX lock on the file layout.
1977  * So we can use early cancel on client side locks for that resource.
1978  */
1979 static inline int mdc_hsm_request_lock_to_cancel(struct obd_export *exp,
1980                                                  struct hsm_user_request *hur,
1981                                                  struct list_head *cancels)
1982 {
1983         struct hsm_user_item *hui = &hur->hur_user_item[0];
1984         struct hsm_request *req_hr = &hur->hur_request;
1985         int count = 0;
1986         int i;
1987
1988         if (req_hr->hr_action != HUA_RESTORE &&
1989             req_hr->hr_action != HUA_RELEASE)
1990                 return 0;
1991
1992         for (i = 0; i < req_hr->hr_itemcount; i++, hui++) {
1993                 if (!fid_is_sane(&hui->hui_fid))
1994                         continue;
1995                 count += mdc_resource_get_unused(exp, &hui->hui_fid, cancels,
1996                                                  LCK_EX, MDS_INODELOCK_LAYOUT);
1997         }
1998
1999         return count;
2000 }
2001
2002 static int mdc_ioc_hsm_request(struct obd_export *exp,
2003                                struct hsm_user_request *hur)
2004 {
2005         struct obd_import *imp = class_exp2cliimp(exp);
2006         struct ptlrpc_request *req;
2007         struct hsm_request *req_hr;
2008         struct hsm_user_item *req_hui;
2009         char *req_opaque;
2010         LIST_HEAD(cancels);
2011         int count;
2012         int rc;
2013         ENTRY;
2014
2015         req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST);
2016         if (req == NULL)
2017                 RETURN(-ENOMEM);
2018
2019         req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT,
2020                              hur->hur_request.hr_itemcount
2021                              * sizeof(struct hsm_user_item));
2022         req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT,
2023                              hur->hur_request.hr_data_len);
2024
2025         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST);
2026         if (rc) {
2027                 ptlrpc_request_free(req);
2028                 RETURN(rc);
2029         }
2030
2031         /* Cancel existing locks */
2032         count = mdc_hsm_request_lock_to_cancel(exp, hur, &cancels);
2033         ldlm_cli_cancel_list(&cancels, count, NULL, 0);
2034         mdc_pack_body(&req->rq_pill, NULL, 0, 0, -1, 0);
2035
2036         /* Copy hsm_request struct */
2037         req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
2038         if (req_hr == NULL)
2039                 GOTO(out, rc = -EPROTO);
2040         *req_hr = hur->hur_request;
2041
2042         /* Copy hsm_user_item structs */
2043         req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM);
2044         if (req_hui == NULL)
2045                 GOTO(out, rc = -EPROTO);
2046         memcpy(req_hui, hur->hur_user_item,
2047                hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item));
2048
2049         /* Copy opaque field */
2050         req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA);
2051         if (req_opaque == NULL)
2052                 GOTO(out, rc = -EPROTO);
2053         memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len);
2054
2055         ptlrpc_request_set_replen(req);
2056
2057         ptlrpc_get_mod_rpc_slot(req);
2058         rc = ptlrpc_queue_wait(req);
2059         ptlrpc_put_mod_rpc_slot(req);
2060
2061         GOTO(out, rc);
2062
2063 out:
2064         ptlrpc_req_finished(req);
2065         return rc;
2066 }
2067
2068 static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
2069                                 struct lustre_kernelcomm *lk);
2070
2071 static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
2072                         struct obd_quotactl *oqctl)
2073 {
2074         struct ptlrpc_request *req;
2075         struct obd_quotactl *oqc;
2076         int rc;
2077         ENTRY;
2078
2079         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_QUOTACTL);
2080         if (req == NULL)
2081                 RETURN(-ENOMEM);
2082
2083
2084         if (LUSTRE_Q_CMD_IS_POOL(oqctl->qc_cmd))
2085                 req_capsule_set_size(&req->rq_pill,
2086                                      &RMF_OBD_QUOTACTL,
2087                                      RCL_CLIENT,
2088                                      sizeof(*oqc) + LOV_MAXPOOLNAME + 1);
2089
2090         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION,
2091                                  MDS_QUOTACTL);
2092         if (rc) {
2093                 ptlrpc_request_free(req);
2094                 RETURN(rc);
2095         }
2096
2097         oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
2098         QCTL_COPY(oqc, oqctl);
2099
2100         ptlrpc_request_set_replen(req);
2101         ptlrpc_at_set_req_timeout(req);
2102
2103         rc = ptlrpc_queue_wait(req);
2104         if (rc) {
2105                 CERROR("%s: ptlrpc_queue_wait failed: rc = %d\n",
2106                        exp->exp_obd->obd_name, rc);
2107                 GOTO(out, rc);
2108         }
2109
2110         if (req->rq_repmsg &&
2111             (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) {
2112                 QCTL_COPY(oqctl, oqc);
2113         } else if (!rc) {
2114                 rc = -EPROTO;
2115                 CERROR("%s: cannot unpack obd_quotactl: rc = %d\n",
2116                         exp->exp_obd->obd_name, rc);
2117         }
2118 out:
2119         ptlrpc_req_finished(req);
2120
2121         RETURN(rc);
2122 }
2123
2124 static int mdc_ioc_swap_layouts(struct obd_export *exp,
2125                                 struct md_op_data *op_data)
2126 {
2127         LIST_HEAD(cancels);
2128         struct ptlrpc_request   *req;
2129         int                      rc, count;
2130         struct mdc_swap_layouts *msl, *payload;
2131         ENTRY;
2132
2133         msl = op_data->op_data;
2134
2135         /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
2136          * first thing it will do is to cancel the 2 layout
2137          * locks held by this client.
2138          * So the client must cancel its layout locks on the 2 fids
2139          * with the request RPC to avoid extra RPC round trips.
2140          */
2141         count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
2142                                         LCK_EX, MDS_INODELOCK_LAYOUT |
2143                                         MDS_INODELOCK_XATTR);
2144         count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
2145                                          LCK_EX, MDS_INODELOCK_LAYOUT |
2146                                          MDS_INODELOCK_XATTR);
2147
2148         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2149                                    &RQF_MDS_SWAP_LAYOUTS);
2150         if (req == NULL) {
2151                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
2152                 RETURN(-ENOMEM);
2153         }
2154
2155         rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
2156         if (rc) {
2157                 ptlrpc_request_free(req);
2158                 RETURN(rc);
2159         }
2160
2161         mdc_swap_layouts_pack(&req->rq_pill, op_data);
2162
2163         payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS);
2164         LASSERT(payload);
2165
2166         *payload = *msl;
2167
2168         ptlrpc_request_set_replen(req);
2169
2170         rc = ptlrpc_queue_wait(req);
2171         if (rc)
2172                 GOTO(out, rc);
2173         EXIT;
2174
2175 out:
2176         ptlrpc_req_finished(req);
2177         return rc;
2178 }
2179
2180 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2181                          void *karg, void __user *uarg)
2182 {
2183         struct obd_device *obd = exp->exp_obd;
2184         struct obd_ioctl_data *data;
2185         struct obd_import *imp = obd->u.cli.cl_import;
2186         int rc;
2187
2188         ENTRY;
2189         CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
2190                obd->obd_name, cmd, len, karg, uarg);
2191
2192         /* handle commands that do not need @karg first */
2193         switch (cmd) {
2194         case LL_IOC_GET_CONNECT_FLAGS:
2195                 if (copy_to_user(uarg, exp_connect_flags_ptr(exp),
2196                                  sizeof(*exp_connect_flags_ptr(exp))))
2197                         RETURN(-EFAULT);
2198                 RETURN(0);
2199         }
2200
2201         if (unlikely(karg == NULL))
2202                 RETURN(OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL", -EINVAL));
2203         data = karg;
2204
2205         if (!try_module_get(THIS_MODULE)) {
2206                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2207                        module_name(THIS_MODULE));
2208                 return -EINVAL;
2209         }
2210         switch (cmd) {
2211         case OBD_IOC_FID2PATH:
2212                 rc = mdc_ioc_fid2path(exp, karg);
2213                 GOTO(out, rc);
2214         case LL_IOC_HSM_CT_START:
2215                 rc = mdc_ioc_hsm_ct_start(exp, karg);
2216                 /* ignore if it was already registered on this MDS. */
2217                 if (rc == -EEXIST)
2218                         rc = 0;
2219                 GOTO(out, rc);
2220         case LL_IOC_HSM_PROGRESS:
2221                 rc = mdc_ioc_hsm_progress(exp, karg);
2222                 GOTO(out, rc);
2223         case LL_IOC_HSM_STATE_GET:
2224                 rc = mdc_ioc_hsm_state_get(exp, karg);
2225                 GOTO(out, rc);
2226         case LL_IOC_HSM_STATE_SET:
2227                 rc = mdc_ioc_hsm_state_set(exp, karg);
2228                 GOTO(out, rc);
2229         case LL_IOC_HSM_ACTION:
2230                 rc = mdc_ioc_hsm_current_action(exp, karg);
2231                 GOTO(out, rc);
2232         case LL_IOC_HSM_REQUEST:
2233                 rc = mdc_ioc_hsm_request(exp, karg);
2234                 GOTO(out, rc);
2235         case OBD_IOC_CLIENT_RECOVER:
2236                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
2237                 if (rc < 0)
2238                         GOTO(out, rc);
2239                 GOTO(out, rc = 0);
2240 #ifdef IOC_OSC_SET_ACTIVE
2241         case_OBD_IOC_DEPRECATED_FT(IOC_OSC_SET_ACTIVE, obd->obd_name, 2, 17);
2242 #endif
2243         case OBD_IOC_SET_ACTIVE:
2244                 rc = ptlrpc_set_import_active(imp, data->ioc_offset);
2245                 GOTO(out, rc);
2246         /*
2247          * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by
2248          * LMV instead of MDC. But when the cluster is upgraded from 1.8,
2249          * there'd be no LMV layer thus we might be called here. Eventually
2250          * this code should be removed.
2251          * bz20731, LU-592.
2252          */
2253         case IOC_OBD_STATFS: {
2254                 struct obd_statfs stat_buf = {0};
2255
2256                 if (*((__u32 *) data->ioc_inlbuf2) != 0)
2257                         GOTO(out, rc = -ENODEV);
2258
2259                 /* copy UUID */
2260                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd),
2261                                  min((int)data->ioc_plen2,
2262                                      (int)sizeof(struct obd_uuid))))
2263                         GOTO(out, rc = -EFAULT);
2264
2265                 rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf,
2266                                 ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS,
2267                                 0);
2268                 if (rc != 0)
2269                         GOTO(out, rc);
2270
2271                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
2272                                      min((int) data->ioc_plen1,
2273                                          (int) sizeof(stat_buf))))
2274                         GOTO(out, rc = -EFAULT);
2275
2276                 GOTO(out, rc = 0);
2277         }
2278         case OBD_IOC_QUOTACTL: {
2279                 struct if_quotactl *qctl = karg;
2280                 struct obd_quotactl *oqctl;
2281
2282                 OBD_ALLOC_PTR(oqctl);
2283                 if (oqctl == NULL)
2284                         GOTO(out, rc = -ENOMEM);
2285
2286                 QCTL_COPY(oqctl, qctl);
2287                 rc = obd_quotactl(exp, oqctl);
2288                 if (rc == 0) {
2289                         QCTL_COPY_NO_PNAME(qctl, oqctl);
2290                         qctl->qc_valid = QC_MDTIDX;
2291                         qctl->obd_uuid = obd->u.cli.cl_target_uuid;
2292                 }
2293
2294                 OBD_FREE_PTR(oqctl);
2295                 GOTO(out, rc);
2296         }
2297         case LL_IOC_LOV_SWAP_LAYOUTS:
2298                 rc = mdc_ioc_swap_layouts(exp, karg);
2299                 GOTO(out, rc);
2300         default:
2301                 rc = OBD_IOC_ERROR(obd->obd_name, cmd, "unrecognized", -ENOTTY);
2302                 break;
2303         }
2304 out:
2305         module_put(THIS_MODULE);
2306
2307         return rc;
2308 }
2309
2310 static int mdc_get_info_rpc(struct obd_export *exp,
2311                             u32 keylen, void *key,
2312                             u32 vallen, void *val)
2313 {
2314         struct obd_import      *imp = class_exp2cliimp(exp);
2315         struct ptlrpc_request  *req;
2316         char                   *tmp;
2317         int                     rc = -EINVAL;
2318         ENTRY;
2319
2320         req = ptlrpc_request_alloc(imp, &RQF_MDS_GET_INFO);
2321         if (req == NULL)
2322                 RETURN(-ENOMEM);
2323
2324         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
2325                              RCL_CLIENT, keylen);
2326         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
2327                              RCL_CLIENT, sizeof(vallen));
2328
2329         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
2330         if (rc) {
2331                 ptlrpc_request_free(req);
2332                 RETURN(rc);
2333         }
2334
2335         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
2336         memcpy(tmp, key, keylen);
2337         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
2338         memcpy(tmp, &vallen, sizeof(vallen));
2339
2340         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
2341                              RCL_SERVER, vallen);
2342         ptlrpc_request_set_replen(req);
2343
2344         /* if server failed to resolve FID, and OI scrub not able to fix it, it
2345          * will return -EINPROGRESS, ptlrpc_queue_wait() will keep retrying,
2346          * set request interruptible to avoid deadlock.
2347          */
2348         if (KEY_IS(KEY_FID2PATH))
2349                 req->rq_allow_intr = 1;
2350
2351         rc = ptlrpc_queue_wait(req);
2352         /* -EREMOTE means the get_info result is partial, and it needs to
2353          * continue on another MDT, see fid2path part in lmv_iocontrol */
2354         if (rc == 0 || rc == -EREMOTE) {
2355                 tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
2356                 memcpy(val, tmp, vallen);
2357                 if (req_capsule_rep_need_swab(&req->rq_pill)) {
2358                         if (KEY_IS(KEY_FID2PATH))
2359                                 lustre_swab_fid2path(val);
2360                 }
2361         }
2362         ptlrpc_req_finished(req);
2363
2364         RETURN(rc);
2365 }
2366
2367 static void lustre_swab_hai(struct hsm_action_item *h)
2368 {
2369         __swab32s(&h->hai_len);
2370         __swab32s(&h->hai_action);
2371         lustre_swab_lu_fid(&h->hai_fid);
2372         lustre_swab_lu_fid(&h->hai_dfid);
2373         __swab64s(&h->hai_cookie);
2374         __swab64s(&h->hai_extent.offset);
2375         __swab64s(&h->hai_extent.length);
2376         __swab64s(&h->hai_gid);
2377 }
2378
2379 static void lustre_swab_hal(struct hsm_action_list *h)
2380 {
2381         struct hsm_action_item  *hai;
2382         __u32                    i;
2383
2384         __swab32s(&h->hal_version);
2385         __swab32s(&h->hal_count);
2386         __swab32s(&h->hal_archive_id);
2387         __swab64s(&h->hal_flags);
2388         hai = hai_first(h);
2389         for (i = 0; i < h->hal_count; i++, hai = hai_next(hai))
2390                 lustre_swab_hai(hai);
2391 }
2392
2393 static void lustre_swab_kuch(struct kuc_hdr *l)
2394 {
2395         __swab16s(&l->kuc_magic);
2396         /* __u8 l->kuc_transport */
2397         __swab16s(&l->kuc_msgtype);
2398         __swab16s(&l->kuc_msglen);
2399 }
2400
2401 static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
2402                                 struct lustre_kernelcomm *lk)
2403 {
2404         struct obd_import *imp = class_exp2cliimp(exp);
2405         int rc = 0;
2406
2407         if (lk->lk_group != KUC_GRP_HSM) {
2408                 CERROR("Bad copytool group %d\n", lk->lk_group);
2409                 return -EINVAL;
2410         }
2411
2412         CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd,
2413                lk->lk_uid, lk->lk_group, lk->lk_flags);
2414
2415         if (lk->lk_flags & LK_FLG_STOP) {
2416                 /* Unregister with the coordinator */
2417                 rc = mdc_ioc_hsm_ct_unregister(imp);
2418         } else {
2419                 __u32 *archives = NULL;
2420
2421                 if ((lk->lk_flags & LK_FLG_DATANR) && lk->lk_data_count > 0)
2422                         archives = lk->lk_data;
2423
2424                 rc = mdc_ioc_hsm_ct_register(imp, lk->lk_data_count, archives);
2425         }
2426
2427         return rc;
2428 }
2429
2430 /**
2431  * Send a message to any listening copytools
2432  * @param val KUC message (kuc_hdr + hsm_action_list)
2433  * @param len total length of message
2434  */
2435 static int mdc_hsm_copytool_send(const struct obd_uuid *uuid,
2436                                  size_t len, void *val)
2437 {
2438         struct kuc_hdr          *lh = (struct kuc_hdr *)val;
2439         struct hsm_action_list  *hal = (struct hsm_action_list *)(lh + 1);
2440         int                      rc;
2441         ENTRY;
2442
2443         if (len < sizeof(*lh) + sizeof(*hal)) {
2444                 CERROR("Short HSM message %zu < %zu\n", len,
2445                        sizeof(*lh) + sizeof(*hal));
2446                 RETURN(-EPROTO);
2447         }
2448         if (lh->kuc_magic == __swab16(KUC_MAGIC)) {
2449                 lustre_swab_kuch(lh);
2450                 lustre_swab_hal(hal);
2451         } else if (lh->kuc_magic != KUC_MAGIC) {
2452                 CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC);
2453                 RETURN(-EPROTO);
2454         }
2455
2456         CDEBUG(D_HSM, " Received message mg=%x t=%d m=%d l=%d actions=%d "
2457                "on %s\n",
2458                lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype,
2459                lh->kuc_msglen, hal->hal_count, hal->hal_fsname);
2460
2461         /* Broadcast to HSM listeners */
2462         rc = libcfs_kkuc_group_put(uuid, KUC_GRP_HSM, lh);
2463
2464         RETURN(rc);
2465 }
2466
2467 /**
2468  * callback function passed to kuc for re-registering each HSM copytool
2469  * running on MDC, after MDT shutdown/recovery.
2470  * @param data copytool registration data
2471  * @param cb_arg callback argument (obd_import)
2472  */
2473 static int mdc_hsm_ct_reregister(void *data, void *cb_arg)
2474 {
2475         struct obd_import *imp = (struct obd_import *)cb_arg;
2476         struct kkuc_ct_data *kcd = data;
2477         __u32 *archives = NULL;
2478         int rc;
2479
2480         if (kcd == NULL ||
2481             (kcd->kcd_magic != KKUC_CT_DATA_ARRAY_MAGIC &&
2482              kcd->kcd_magic != KKUC_CT_DATA_BITMAP_MAGIC))
2483                 return -EPROTO;
2484
2485         if (kcd->kcd_magic == KKUC_CT_DATA_BITMAP_MAGIC) {
2486                 CDEBUG(D_HA, "%s: recover copytool registration to MDT "
2487                        "(archive=%#x)\n", imp->imp_obd->obd_name,
2488                        kcd->kcd_nr_archives);
2489         } else {
2490                 CDEBUG(D_HA, "%s: recover copytool registration to MDT "
2491                        "(archive nr = %u)\n",
2492                        imp->imp_obd->obd_name, kcd->kcd_nr_archives);
2493                 if (kcd->kcd_nr_archives != 0)
2494                         archives = kcd->kcd_archives;
2495         }
2496
2497         rc = mdc_ioc_hsm_ct_register(imp, kcd->kcd_nr_archives, archives);
2498         /* ignore error if the copytool is already registered */
2499         return (rc == -EEXIST) ? 0 : rc;
2500 }
2501
2502 static int mdc_kuc_reregister_thread(void *data)
2503 {
2504         struct obd_import *imp = data;
2505         int rc;
2506         ENTRY;
2507
2508         /* re-register HSM agents */
2509         rc = libcfs_kkuc_group_foreach(&imp->imp_obd->obd_uuid, KUC_GRP_HSM,
2510                                        mdc_hsm_ct_reregister, imp);
2511         if (rc < 0 && rc != -EEXIST)
2512                 CWARN("%s: Failed to re-register HSM agents (uuid: %s): rc = %d\n",
2513                       imp->imp_obd->obd_name,
2514                       obd_uuid2str(&imp->imp_obd->obd_uuid), rc);
2515
2516         class_import_put(imp);
2517         RETURN(rc);
2518 }
2519
2520 /**
2521  * Re-establish all kuc contexts with MDT
2522  * after MDT shutdown/recovery.
2523  * This is done in background.
2524  */
2525 static int mdc_kuc_reregister(struct obd_import *imp)
2526 {
2527         struct task_struct *task;
2528         int rc = 0;
2529
2530         class_import_get(imp);
2531         task = kthread_run(mdc_kuc_reregister_thread, imp, "kuc_reregister");
2532
2533         if (IS_ERR(task)) {
2534                 class_import_put(imp);
2535                 rc = PTR_ERR(task);
2536         }
2537
2538         return rc;
2539 }
2540
2541 static int mdc_set_info_async(const struct lu_env *env,
2542                               struct obd_export *exp,
2543                               u32 keylen, void *key,
2544                               u32 vallen, void *val,
2545                               struct ptlrpc_request_set *set)
2546 {
2547         struct obd_import       *imp = class_exp2cliimp(exp);
2548         int                      rc;
2549         ENTRY;
2550
2551         if (KEY_IS(KEY_READ_ONLY)) {
2552                 if (vallen != sizeof(int))
2553                         RETURN(-EINVAL);
2554
2555                 spin_lock(&imp->imp_lock);
2556                 if (*((int *)val)) {
2557                         imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY;
2558                         imp->imp_connect_data.ocd_connect_flags |=
2559                                                         OBD_CONNECT_RDONLY;
2560                 } else {
2561                         imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY;
2562                         imp->imp_connect_data.ocd_connect_flags &=
2563                                                         ~OBD_CONNECT_RDONLY;
2564                 }
2565                 spin_unlock(&imp->imp_lock);
2566
2567                 rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2568                                        keylen, key, vallen, val, set);
2569                 RETURN(rc);
2570         }
2571         if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2572                 rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION,
2573                                        keylen, key, vallen, val, set);
2574                 RETURN(rc);
2575         }
2576         if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) {
2577                 rc = mdc_hsm_copytool_send(&imp->imp_obd->obd_uuid, vallen,
2578                                            val);
2579                 RETURN(rc);
2580         }
2581
2582         if (KEY_IS(KEY_DEFAULT_EASIZE)) {
2583                 __u32 *default_easize = val;
2584
2585                 exp->exp_obd->u.cli.cl_default_mds_easize = *default_easize;
2586                 RETURN(0);
2587         }
2588
2589         rc = osc_set_info_async(env, exp, keylen, key, vallen, val, set);
2590         RETURN(rc);
2591 }
2592
2593 static int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
2594                         __u32 keylen, void *key, __u32 *vallen, void *val)
2595 {
2596         int rc = -EINVAL;
2597
2598         if (KEY_IS(KEY_MAX_EASIZE)) {
2599                 __u32 mdsize, *max_easize;
2600
2601                 if (*vallen != sizeof(int))
2602                         RETURN(-EINVAL);
2603                 mdsize = *(__u32 *)val;
2604                 if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
2605                         exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
2606                 max_easize = val;
2607                 *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
2608                 RETURN(0);
2609         } else if (KEY_IS(KEY_DEFAULT_EASIZE)) {
2610                 __u32 *default_easize;
2611
2612                 if (*vallen != sizeof(int))
2613                         RETURN(-EINVAL);
2614                 default_easize = val;
2615                 *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize;
2616                 RETURN(0);
2617         } else if (KEY_IS(KEY_CONN_DATA)) {
2618                 struct obd_import *imp = class_exp2cliimp(exp);
2619                 struct obd_connect_data *data = val;
2620
2621                 if (*vallen != sizeof(*data))
2622                         RETURN(-EINVAL);
2623
2624                 *data = imp->imp_connect_data;
2625                 RETURN(0);
2626         } else if (KEY_IS(KEY_TGT_COUNT)) {
2627                 *((__u32 *)val) = 1;
2628                 RETURN(0);
2629         }
2630
2631         rc = mdc_get_info_rpc(exp, keylen, key, *vallen, val);
2632
2633         RETURN(rc);
2634 }
2635
2636 static int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid,
2637                      struct ptlrpc_request **request)
2638 {
2639         struct ptlrpc_request *req;
2640         int                    rc;
2641         ENTRY;
2642
2643         *request = NULL;
2644         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_SYNC);
2645         if (req == NULL)
2646                 RETURN(-ENOMEM);
2647
2648         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SYNC);
2649         if (rc) {
2650                 ptlrpc_request_free(req);
2651                 RETURN(rc);
2652         }
2653
2654         mdc_pack_body(&req->rq_pill, fid, 0, 0, -1, 0);
2655
2656         ptlrpc_request_set_replen(req);
2657
2658         rc = ptlrpc_queue_wait(req);
2659         if (rc)
2660                 ptlrpc_req_finished(req);
2661         else
2662                 *request = req;
2663         RETURN(rc);
2664 }
2665
2666 struct mdc_rmfid_args {
2667         int *mra_rcs;
2668         int mra_nr;
2669 };
2670
2671 static int mdc_rmfid_interpret(const struct lu_env *env,
2672                                struct ptlrpc_request *req,
2673                                void *args, int rc)
2674 {
2675         struct mdc_rmfid_args *aa;
2676         int *rcs, size;
2677         ENTRY;
2678
2679         if (!rc) {
2680                 aa = ptlrpc_req_async_args(aa, req);
2681
2682                 size = req_capsule_get_size(&req->rq_pill, &RMF_RCS,
2683                                             RCL_SERVER);
2684                 LASSERT(size == sizeof(int) * aa->mra_nr);
2685                 rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
2686                 LASSERT(rcs);
2687                 LASSERT(aa->mra_rcs);
2688                 LASSERT(aa->mra_nr);
2689                 memcpy(aa->mra_rcs, rcs, size);
2690         }
2691
2692         RETURN(rc);
2693 }
2694
2695 static int mdc_rmfid(struct obd_export *exp, struct fid_array *fa,
2696                      int *rcs, struct ptlrpc_request_set *set)
2697 {
2698         struct ptlrpc_request *req;
2699         struct mdc_rmfid_args *aa;
2700         struct mdt_body *b;
2701         struct lu_fid *tmp;
2702         int rc, flen;
2703         ENTRY;
2704
2705         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_RMFID);
2706         if (req == NULL)
2707                 RETURN(-ENOMEM);
2708
2709         flen = fa->fa_nr * sizeof(struct lu_fid);
2710         req_capsule_set_size(&req->rq_pill, &RMF_FID_ARRAY,
2711                              RCL_CLIENT, flen);
2712         req_capsule_set_size(&req->rq_pill, &RMF_FID_ARRAY,
2713                              RCL_SERVER, flen);
2714         req_capsule_set_size(&req->rq_pill, &RMF_RCS,
2715                              RCL_SERVER, fa->fa_nr * sizeof(__u32));
2716         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_RMFID);
2717         if (rc) {
2718                 ptlrpc_request_free(req);
2719                 RETURN(rc);
2720         }
2721         tmp = req_capsule_client_get(&req->rq_pill, &RMF_FID_ARRAY);
2722         memcpy(tmp, fa->fa_fids, flen);
2723
2724         mdc_pack_body(&req->rq_pill, NULL, 0, 0, -1, 0);
2725         b = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY);
2726         b->mbo_ctime = ktime_get_real_seconds();
2727
2728         ptlrpc_request_set_replen(req);
2729
2730         LASSERT(rcs);
2731         aa = ptlrpc_req_async_args(aa, req);
2732         aa->mra_rcs = rcs;
2733         aa->mra_nr = fa->fa_nr;
2734         req->rq_interpret_reply = mdc_rmfid_interpret;
2735
2736         ptlrpc_set_add_req(set, req);
2737         ptlrpc_check_set(NULL, set);
2738
2739         RETURN(rc);
2740 }
2741
2742 static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
2743                             enum obd_import_event event)
2744 {
2745         struct client_obd *cli;
2746         int rc = 0;
2747
2748         ENTRY;
2749         if (WARN_ON_ONCE(!obd || !imp || imp->imp_obd != obd))
2750                 RETURN(-ENODEV);
2751
2752         cli = &obd->u.cli;
2753         if (!cli)
2754                 RETURN(-ENODEV);
2755
2756         switch (event) {
2757         case IMP_EVENT_DISCON:
2758                 spin_lock(&cli->cl_loi_list_lock);
2759                 cli->cl_avail_grant = 0;
2760                 cli->cl_lost_grant = 0;
2761                 spin_unlock(&cli->cl_loi_list_lock);
2762                 break;
2763         case IMP_EVENT_INACTIVE:
2764                 /*
2765                  * Flush current sequence to make client obtain new one
2766                  * from server in case of disconnect/reconnect.
2767                  */
2768                 down_read(&cli->cl_seq_rwsem);
2769                 if (cli->cl_seq)
2770                         seq_client_flush(cli->cl_seq);
2771                 up_read(&cli->cl_seq_rwsem);
2772
2773                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
2774                 break;
2775         case IMP_EVENT_INVALIDATE: {
2776                 struct ldlm_namespace *ns = obd->obd_namespace;
2777                 struct lu_env *env;
2778                 __u16 refcheck;
2779
2780                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2781
2782                 env = cl_env_get(&refcheck);
2783                 if (!IS_ERR(env)) {
2784                         /* Reset grants. All pages go to failing rpcs due to
2785                          * the invalid import.
2786                          */
2787                         osc_io_unplug(env, cli, NULL);
2788
2789                         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2790                                                  osc_ldlm_resource_invalidate,
2791                                                  env, 0);
2792                         cl_env_put(env, &refcheck);
2793                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2794                 } else {
2795                         rc = PTR_ERR(env);
2796                 }
2797                 break;
2798         }
2799         case IMP_EVENT_ACTIVE:
2800                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
2801                 /* redo the kuc registration after reconnecting */
2802                 if (rc == 0)
2803                         rc = mdc_kuc_reregister(imp);
2804                 break;
2805         case IMP_EVENT_OCD: {
2806                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2807
2808                 if (OCD_HAS_FLAG(ocd, GRANT))
2809                         osc_init_grant(cli, ocd);
2810
2811                 md_init_ea_size(obd->obd_self_export, ocd->ocd_max_easize, 0);
2812                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
2813                 break;
2814         }
2815         case IMP_EVENT_DEACTIVATE:
2816         case IMP_EVENT_ACTIVATE:
2817                 break;
2818         default:
2819                 CERROR("Unknown import event %x\n", event);
2820                 LBUG();
2821         }
2822         RETURN(rc);
2823 }
2824
2825 int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp,
2826                   struct lu_fid *fid, struct md_op_data *op_data)
2827 {
2828         struct client_obd *cli = &exp->exp_obd->u.cli;
2829         int rc = -EIO;
2830
2831         ENTRY;
2832
2833         down_read(&cli->cl_seq_rwsem);
2834         if (cli->cl_seq)
2835                 rc = seq_client_alloc_fid(env, cli->cl_seq, fid);
2836         up_read(&cli->cl_seq_rwsem);
2837
2838         RETURN(rc);
2839 }
2840
2841 static struct obd_uuid *mdc_get_uuid(struct obd_export *exp)
2842 {
2843         struct client_obd *cli = &exp->exp_obd->u.cli;
2844         return &cli->cl_target_uuid;
2845 }
2846
2847 /**
2848  * Determine whether the lock can be canceled before replaying it during
2849  * recovery, non zero value will be return if the lock can be canceled,
2850  * or zero returned for not
2851  */
2852 static int mdc_cancel_weight(struct ldlm_lock *lock)
2853 {
2854         if (lock->l_resource->lr_type != LDLM_IBITS)
2855                 RETURN(0);
2856
2857         /* FIXME: if we ever get into a situation where there are too many
2858          * opened files with open locks on a single node, then we really
2859          * should replay these open locks to reget it */
2860         if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN)
2861                 RETURN(0);
2862
2863         /* Special case for DoM locks, cancel only unused and granted locks */
2864         if (ldlm_has_dom(lock) &&
2865             (lock->l_granted_mode != lock->l_req_mode ||
2866              osc_ldlm_weigh_ast(lock) != 0))
2867                 RETURN(0);
2868
2869         RETURN(1);
2870 }
2871
2872 static int mdc_resource_inode_free(struct ldlm_resource *res)
2873 {
2874         if (res->lr_lvb_inode)
2875                 res->lr_lvb_inode = NULL;
2876
2877         return 0;
2878 }
2879
2880 static struct ldlm_valblock_ops inode_lvbo = {
2881         .lvbo_free = mdc_resource_inode_free
2882 };
2883
2884 static int mdc_llog_init(struct obd_device *obd)
2885 {
2886         struct obd_llog_group   *olg = &obd->obd_olg;
2887         struct llog_ctxt        *ctxt;
2888         int                      rc;
2889
2890         ENTRY;
2891
2892         rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, obd,
2893                         &llog_client_ops);
2894         if (rc < 0)
2895                 RETURN(rc);
2896
2897         ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
2898         llog_initiator_connect(ctxt);
2899         llog_ctxt_put(ctxt);
2900
2901         RETURN(0);
2902 }
2903
2904 static void mdc_llog_finish(struct obd_device *obd)
2905 {
2906         struct llog_ctxt *ctxt;
2907
2908         ENTRY;
2909
2910         ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
2911         if (ctxt != NULL)
2912                 llog_cleanup(NULL, ctxt);
2913
2914         EXIT;
2915 }
2916
2917 int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
2918 {
2919         int rc;
2920
2921         ENTRY;
2922
2923         rc = osc_setup_common(obd, cfg);
2924         if (rc < 0)
2925                 RETURN(rc);
2926
2927         rc = mdc_tunables_init(obd);
2928         if (rc)
2929                 GOTO(err_osc_cleanup, rc);
2930
2931         obd->u.cli.cl_dom_min_inline_repsize = MDC_DOM_DEF_INLINE_REPSIZE;
2932         obd->u.cli.cl_lsom_update = true;
2933
2934         ns_register_cancel(obd->obd_namespace, mdc_cancel_weight);
2935
2936         obd->obd_namespace->ns_lvbo = &inode_lvbo;
2937
2938         rc = mdc_llog_init(obd);
2939         if (rc) {
2940                 CERROR("%s: failed to setup llogging subsystems: rc = %d\n",
2941                        obd->obd_name, rc);
2942                 GOTO(err_llog_cleanup, rc);
2943         }
2944
2945         rc = mdc_changelog_cdev_init(obd);
2946         if (rc) {
2947                 CERROR("%s: failed to setup changelog char device: rc = %d\n",
2948                        obd->obd_name, rc);
2949                 GOTO(err_changelog_cleanup, rc);
2950         }
2951
2952         RETURN(rc);
2953
2954 err_changelog_cleanup:
2955         mdc_llog_finish(obd);
2956 err_llog_cleanup:
2957         lprocfs_free_md_stats(obd);
2958         ptlrpc_lprocfs_unregister_obd(obd);
2959 err_osc_cleanup:
2960         osc_cleanup_common(obd);
2961         return rc;
2962 }
2963
2964 /* Initialize the default and maximum LOV EA sizes.  This allows
2965  * us to make MDS RPCs with large enough reply buffers to hold a default
2966  * sized EA without having to calculate this (via a call into the
2967  * LOV + OSCs) each time we make an RPC.  The maximum size is also tracked
2968  * but not used to avoid wastefully vmalloc()'ing large reply buffers when
2969  * a large number of stripes is possible.  If a larger reply buffer is
2970  * required it will be reallocated in the ptlrpc layer due to overflow.
2971  */
2972 static int mdc_init_ea_size(struct obd_export *exp, __u32 easize,
2973                             __u32 def_easize)
2974 {
2975         struct obd_device *obd = exp->exp_obd;
2976         struct client_obd *cli = &obd->u.cli;
2977         ENTRY;
2978
2979         if (cli->cl_max_mds_easize < easize)
2980                 cli->cl_max_mds_easize = easize;
2981
2982         if (cli->cl_default_mds_easize < def_easize)
2983                 cli->cl_default_mds_easize = def_easize;
2984
2985         RETURN(0);
2986 }
2987
2988 static int mdc_precleanup(struct obd_device *obd)
2989 {
2990         ENTRY;
2991
2992         osc_precleanup_common(obd);
2993
2994         mdc_changelog_cdev_finish(obd);
2995         mdc_llog_finish(obd);
2996         lprocfs_free_md_stats(obd);
2997         ptlrpc_lprocfs_unregister_obd(obd);
2998
2999         RETURN(0);
3000 }
3001
3002 static int mdc_cleanup(struct obd_device *obd)
3003 {
3004         struct client_obd *cli = &obd->u.cli;
3005         LASSERT(cli->cl_mod_rpcs_in_flight == 0);
3006         return osc_cleanup_common(obd);
3007 }
3008
3009 static const struct obd_ops mdc_obd_ops = {
3010         .o_owner            = THIS_MODULE,
3011         .o_setup            = mdc_setup,
3012         .o_precleanup       = mdc_precleanup,
3013         .o_cleanup          = mdc_cleanup,
3014         .o_add_conn         = client_import_add_conn,
3015         .o_del_conn         = client_import_del_conn,
3016         .o_connect          = client_connect_import,
3017         .o_reconnect        = osc_reconnect,
3018         .o_disconnect       = osc_disconnect,
3019         .o_iocontrol        = mdc_iocontrol,
3020         .o_set_info_async   = mdc_set_info_async,
3021         .o_statfs           = mdc_statfs,
3022         .o_statfs_async     = mdc_statfs_async,
3023         .o_fid_init         = client_fid_init,
3024         .o_fid_fini         = client_fid_fini,
3025         .o_fid_alloc        = mdc_fid_alloc,
3026         .o_import_event     = mdc_import_event,
3027         .o_get_info         = mdc_get_info,
3028         .o_get_uuid         = mdc_get_uuid,
3029         .o_quotactl         = mdc_quotactl,
3030 };
3031
3032 static const struct md_ops mdc_md_ops = {
3033         .m_get_root         = mdc_get_root,
3034         .m_null_inode       = mdc_null_inode,
3035         .m_close            = mdc_close,
3036         .m_create           = mdc_create,
3037         .m_enqueue          = mdc_enqueue,
3038         .m_getattr          = mdc_getattr,
3039         .m_getattr_name     = mdc_getattr_name,
3040         .m_intent_lock      = mdc_intent_lock,
3041         .m_link             = mdc_link,
3042         .m_rename           = mdc_rename,
3043         .m_setattr          = mdc_setattr,
3044         .m_setxattr         = mdc_setxattr,
3045         .m_getxattr         = mdc_getxattr,
3046         .m_fsync                = mdc_fsync,
3047         .m_file_resync          = mdc_file_resync,
3048         .m_read_page            = mdc_read_page,
3049         .m_unlink           = mdc_unlink,
3050         .m_cancel_unused    = mdc_cancel_unused,
3051         .m_init_ea_size     = mdc_init_ea_size,
3052         .m_set_lock_data    = mdc_set_lock_data,
3053         .m_lock_match       = mdc_lock_match,
3054         .m_get_lustre_md    = mdc_get_lustre_md,
3055         .m_set_open_replay_data = mdc_set_open_replay_data,
3056         .m_clear_open_replay_data = mdc_clear_open_replay_data,
3057         .m_intent_getattr_async = mdc_intent_getattr_async,
3058         .m_revalidate_lock      = mdc_revalidate_lock,
3059         .m_rmfid                = mdc_rmfid,
3060         .m_batch_create         = cli_batch_create,
3061         .m_batch_stop           = cli_batch_stop,
3062         .m_batch_flush          = cli_batch_flush,
3063         .m_batch_add            = mdc_batch_add,
3064 };
3065
3066 dev_t mdc_changelog_dev;
3067 struct class *mdc_changelog_class;
3068 static int __init mdc_init(void)
3069 {
3070         int rc = 0;
3071
3072         rc = libcfs_setup();
3073         if (rc)
3074                 return rc;
3075
3076         rc = alloc_chrdev_region(&mdc_changelog_dev, 0,
3077                                  MDC_CHANGELOG_DEV_COUNT,
3078                                  MDC_CHANGELOG_DEV_NAME);
3079         if (rc)
3080                 return rc;
3081
3082         mdc_changelog_class = ll_class_create(MDC_CHANGELOG_DEV_NAME);
3083         if (IS_ERR(mdc_changelog_class)) {
3084                 rc = PTR_ERR(mdc_changelog_class);
3085                 goto out_dev;
3086         }
3087
3088         rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, true,
3089                                  LUSTRE_MDC_NAME, &mdc_device_type);
3090         if (rc)
3091                 goto out_class;
3092
3093         return 0;
3094
3095 out_class:
3096         class_destroy(mdc_changelog_class);
3097 out_dev:
3098         unregister_chrdev_region(mdc_changelog_dev, MDC_CHANGELOG_DEV_COUNT);
3099         return rc;
3100 }
3101
3102 static void __exit mdc_exit(void)
3103 {
3104         class_unregister_type(LUSTRE_MDC_NAME);
3105         class_destroy(mdc_changelog_class);
3106         unregister_chrdev_region(mdc_changelog_dev, MDC_CHANGELOG_DEV_COUNT);
3107         idr_destroy(&mdc_changelog_minor_idr);
3108 }
3109
3110 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3111 MODULE_DESCRIPTION("Lustre Metadata Client");
3112 MODULE_VERSION(LUSTRE_VERSION_STRING);
3113 MODULE_LICENSE("GPL");
3114
3115 module_init(mdc_init);
3116 module_exit(mdc_exit);