Whamcloud - gitweb
LU-15971 llite: match lock in corresponding namespace
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_LMV
33
34 #include <linux/file.h>
35 #include <linux/module.h>
36 #include <linux/init.h>
37 #include <linux/user_namespace.h>
38 #include <linux/uidgid.h>
39 #include <linux/slab.h>
40 #include <linux/pagemap.h>
41 #include <linux/mm.h>
42 #include <linux/math64.h>
43 #include <linux/seq_file.h>
44 #include <linux/namei.h>
45
46 #include <obd_support.h>
47 #include <lustre_lib.h>
48 #include <lustre_net.h>
49 #include <obd_class.h>
50 #include <lustre_lmv.h>
51 #include <lprocfs_status.h>
52 #include <cl_object.h>
53 #include <lustre_fid.h>
54 #include <uapi/linux/lustre/lustre_ioctl.h>
55 #include <lustre_kernelcomm.h>
56 #include "lmv_internal.h"
57
58 static int lmv_check_connect(struct obd_device *obd);
59 static inline bool lmv_op_default_rr_mkdir(const struct md_op_data *op_data);
60
61 void lmv_activate_target(struct lmv_obd *lmv, struct lmv_tgt_desc *tgt,
62                          int activate)
63 {
64         if (tgt->ltd_active == activate)
65                 return;
66
67         tgt->ltd_active = activate;
68         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count +=
69                 (activate ? 1 : -1);
70
71         tgt->ltd_exp->exp_obd->obd_inactive = !activate;
72 }
73
74 /**
75  * Error codes:
76  *
77  *  -EINVAL  : UUID can't be found in the LMV's target list
78  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
79  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
80  */
81 static int lmv_set_mdc_active(struct lmv_obd *lmv,
82                               const struct obd_uuid *uuid,
83                               int activate)
84 {
85         struct lu_tgt_desc *tgt = NULL;
86         struct obd_device *obd;
87         int rc = 0;
88
89         ENTRY;
90
91         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
92                         lmv, uuid->uuid, activate);
93
94         spin_lock(&lmv->lmv_lock);
95         lmv_foreach_connected_tgt(lmv, tgt) {
96                 CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n",
97                        tgt->ltd_index, tgt->ltd_uuid.uuid,
98                        tgt->ltd_exp->exp_handle.h_cookie);
99
100                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
101                         break;
102         }
103
104         if (!tgt)
105                 GOTO(out_lmv_lock, rc = -EINVAL);
106
107         obd = class_exp2obd(tgt->ltd_exp);
108         if (obd == NULL)
109                 GOTO(out_lmv_lock, rc = -ENOTCONN);
110
111         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
112                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
113                obd->obd_type->typ_name, tgt->ltd_index);
114         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
115
116         if (tgt->ltd_active == activate) {
117                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
118                        activate ? "" : "in");
119                 GOTO(out_lmv_lock, rc);
120         }
121
122         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
123                activate ? "" : "in");
124         lmv_activate_target(lmv, tgt, activate);
125         EXIT;
126
127  out_lmv_lock:
128         spin_unlock(&lmv->lmv_lock);
129         return rc;
130 }
131
132 static struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
133 {
134         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
135         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
136
137         return tgt ? obd_get_uuid(tgt->ltd_exp) : NULL;
138 }
139
140 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
141                       enum obd_notify_event ev)
142 {
143         struct obd_connect_data *conn_data;
144         struct lmv_obd          *lmv = &obd->u.lmv;
145         struct obd_uuid         *uuid;
146         int                      rc = 0;
147         ENTRY;
148
149         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
150                 CERROR("unexpected notification of %s %s!\n",
151                        watched->obd_type->typ_name,
152                        watched->obd_name);
153                 RETURN(-EINVAL);
154         }
155
156         uuid = &watched->u.cli.cl_target_uuid;
157         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
158                 /*
159                  * Set MDC as active before notifying the observer, so the
160                  * observer can use the MDC normally.
161                  */
162                 rc = lmv_set_mdc_active(lmv, uuid,
163                                         ev == OBD_NOTIFY_ACTIVE);
164                 if (rc) {
165                         CERROR("%sactivation of %s failed: %d\n",
166                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
167                                uuid->uuid, rc);
168                         RETURN(rc);
169                 }
170         } else if (ev == OBD_NOTIFY_OCD) {
171                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
172                 /*
173                  * XXX: Make sure that ocd_connect_flags from all targets are
174                  * the same. Otherwise one of MDTs runs wrong version or
175                  * something like this.  --umka
176                  */
177                 obd->obd_self_export->exp_connect_data = *conn_data;
178         }
179
180         /*
181          * Pass the notification up the chain.
182          */
183         if (obd->obd_observer)
184                 rc = obd_notify(obd->obd_observer, watched, ev);
185
186         RETURN(rc);
187 }
188
189 static int lmv_connect(const struct lu_env *env,
190                        struct obd_export **pexp, struct obd_device *obd,
191                        struct obd_uuid *cluuid, struct obd_connect_data *data,
192                        void *localdata)
193 {
194         struct lmv_obd *lmv = &obd->u.lmv;
195         struct lustre_handle conn = { 0 };
196         struct obd_export *exp;
197         int rc;
198         ENTRY;
199
200         rc = class_connect(&conn, obd, cluuid);
201         if (rc) {
202                 CERROR("class_connection() returned %d\n", rc);
203                 RETURN(rc);
204         }
205
206         exp = class_conn2export(&conn);
207
208         lmv->connected = 0;
209         lmv->conn_data = *data;
210         lmv->lmv_cache = localdata;
211
212         lmv->lmv_tgts_kobj = kobject_create_and_add("target_obds",
213                                                     &obd->obd_kset.kobj);
214         if (!lmv->lmv_tgts_kobj) {
215                 CERROR("%s: cannot create /sys/fs/lustre/%s/%s/target_obds\n",
216                        obd->obd_name, obd->obd_type->typ_name, obd->obd_name);
217         }
218
219         rc = lmv_check_connect(obd);
220         if (rc != 0)
221                 GOTO(out_sysfs, rc);
222
223         *pexp = exp;
224
225         RETURN(rc);
226
227 out_sysfs:
228         if (lmv->lmv_tgts_kobj)
229                 kobject_put(lmv->lmv_tgts_kobj);
230
231         class_disconnect(exp);
232
233         return rc;
234 }
235
236 static int lmv_init_ea_size(struct obd_export *exp, __u32 easize,
237                             __u32 def_easize)
238 {
239         struct obd_device *obd = exp->exp_obd;
240         struct lmv_obd *lmv = &obd->u.lmv;
241         struct lmv_tgt_desc *tgt;
242         int change = 0;
243         int rc = 0;
244
245         ENTRY;
246
247         if (lmv->max_easize < easize) {
248                 lmv->max_easize = easize;
249                 change = 1;
250         }
251         if (lmv->max_def_easize < def_easize) {
252                 lmv->max_def_easize = def_easize;
253                 change = 1;
254         }
255
256         if (change == 0)
257                 RETURN(0);
258
259         if (lmv->connected == 0)
260                 RETURN(0);
261
262         lmv_foreach_connected_tgt(lmv, tgt) {
263                 if (!tgt->ltd_active)
264                         continue;
265
266                 rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize);
267                 if (rc) {
268                         CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
269                                " rc = %d\n", obd->obd_name, tgt->ltd_index, rc);
270                         break;
271                 }
272         }
273         RETURN(rc);
274 }
275
276 #define MAX_STRING_SIZE 128
277
278 static int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
279 {
280         struct lmv_obd *lmv = &obd->u.lmv;
281         struct obd_device *mdc_obd;
282         struct obd_export *mdc_exp;
283         struct lu_fld_target target;
284         int  rc;
285         ENTRY;
286
287         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
288                                         &obd->obd_uuid);
289         if (!mdc_obd) {
290                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
291                 RETURN(-EINVAL);
292         }
293
294         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s\n",
295                mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
296                tgt->ltd_uuid.uuid, obd->obd_uuid.uuid);
297
298         if (!mdc_obd->obd_set_up) {
299                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
300                 RETURN(-EINVAL);
301         }
302
303         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &obd->obd_uuid,
304                          &lmv->conn_data, lmv->lmv_cache);
305         if (rc) {
306                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
307                 RETURN(rc);
308         }
309
310         /*
311          * Init fid sequence client for this mdc and add new fld target.
312          */
313         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
314         if (rc)
315                 RETURN(rc);
316
317         target.ft_srv = NULL;
318         target.ft_exp = mdc_exp;
319         target.ft_idx = tgt->ltd_index;
320
321         fld_client_add_target(&lmv->lmv_fld, &target);
322
323         rc = obd_register_observer(mdc_obd, obd);
324         if (rc) {
325                 obd_disconnect(mdc_exp);
326                 CERROR("target %s register_observer error %d\n",
327                        tgt->ltd_uuid.uuid, rc);
328                 RETURN(rc);
329         }
330
331         if (obd->obd_observer) {
332                 /*
333                  * Tell the observer about the new target.
334                  */
335                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
336                                 OBD_NOTIFY_ACTIVE);
337                 if (rc) {
338                         obd_disconnect(mdc_exp);
339                         RETURN(rc);
340                 }
341         }
342
343         tgt->ltd_active = 1;
344         tgt->ltd_exp = mdc_exp;
345         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count++;
346
347         md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize);
348
349         rc = lu_qos_add_tgt(&lmv->lmv_qos, tgt);
350         if (rc) {
351                 obd_disconnect(mdc_exp);
352                 RETURN(rc);
353         }
354
355         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
356                mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
357                atomic_read(&obd->obd_refcount));
358
359         lmv_statfs_check_update(obd, tgt);
360
361         if (lmv->lmv_tgts_kobj)
362                 /* Even if we failed to create the link, that's fine */
363                 rc = sysfs_create_link(lmv->lmv_tgts_kobj,
364                                        &mdc_obd->obd_kset.kobj,
365                                        mdc_obd->obd_name);
366         RETURN(0);
367 }
368
369 static void lmv_del_target(struct lmv_obd *lmv, struct lu_tgt_desc *tgt)
370 {
371         LASSERT(tgt);
372         ltd_del_tgt(&lmv->lmv_mdt_descs, tgt);
373         OBD_FREE_PTR(tgt);
374 }
375
376 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
377                            __u32 index, int gen)
378 {
379         struct obd_device *mdc_obd;
380         struct lmv_obd *lmv = &obd->u.lmv;
381         struct lmv_tgt_desc *tgt;
382         struct lu_tgt_descs *ltd = &lmv->lmv_mdt_descs;
383         int rc = 0;
384
385         ENTRY;
386
387         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
388         mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
389                                         &obd->obd_uuid);
390         if (!mdc_obd) {
391                 CERROR("%s: Target %s not attached: rc = %d\n",
392                        obd->obd_name, uuidp->uuid, -EINVAL);
393                 RETURN(-EINVAL);
394         }
395
396         OBD_ALLOC_PTR(tgt);
397         if (!tgt)
398                 RETURN(-ENOMEM);
399
400         mutex_init(&tgt->ltd_fid_mutex);
401         tgt->ltd_index = index;
402         tgt->ltd_uuid = *uuidp;
403         tgt->ltd_active = 0;
404
405         mutex_lock(&ltd->ltd_mutex);
406         rc = ltd_add_tgt(ltd, tgt);
407         mutex_unlock(&ltd->ltd_mutex);
408
409         if (rc)
410                 GOTO(out_tgt, rc);
411
412         if (!lmv->connected)
413                 /* lmv_check_connect() will connect this target. */
414                 RETURN(0);
415
416         rc = lmv_connect_mdc(obd, tgt);
417         if (!rc) {
418                 int easize = sizeof(struct lmv_stripe_md) +
419                         lmv->lmv_mdt_count * sizeof(struct lu_fid);
420
421                 lmv_init_ea_size(obd->obd_self_export, easize, 0);
422         }
423
424         RETURN(rc);
425
426 out_tgt:
427         OBD_FREE_PTR(tgt);
428         return rc;
429 }
430
431 static int lmv_check_connect(struct obd_device *obd)
432 {
433         struct lmv_obd *lmv = &obd->u.lmv;
434         struct lmv_tgt_desc *tgt;
435         int easize;
436         int rc;
437
438         ENTRY;
439
440         if (lmv->connected)
441                 RETURN(0);
442
443         mutex_lock(&lmv->lmv_mdt_descs.ltd_mutex);
444         if (lmv->connected)
445                 GOTO(unlock, rc = 0);
446
447         if (!lmv->lmv_mdt_count) {
448                 CERROR("%s: no targets configured: rc = -EINVAL\n",
449                        obd->obd_name);
450                 GOTO(unlock, rc = -EINVAL);
451         }
452
453         if (!lmv_mdt0_inited(lmv)) {
454                 CERROR("%s: no target configured for index 0: rc = -EINVAL.\n",
455                        obd->obd_name);
456                 GOTO(unlock, rc = -EINVAL);
457         }
458
459         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
460                obd->obd_uuid.uuid, obd->obd_name);
461
462         lmv_foreach_tgt(lmv, tgt) {
463                 rc = lmv_connect_mdc(obd, tgt);
464                 if (rc)
465                         GOTO(out_disc, rc);
466         }
467
468         lmv->connected = 1;
469         easize = lmv_mds_md_size(lmv->lmv_mdt_count, LMV_MAGIC);
470         lmv_init_ea_size(obd->obd_self_export, easize, 0);
471         EXIT;
472 unlock:
473         mutex_unlock(&lmv->lmv_mdt_descs.ltd_mutex);
474
475         return rc;
476
477 out_disc:
478         lmv_foreach_tgt(lmv, tgt) {
479                 tgt->ltd_active = 0;
480                 if (!tgt->ltd_exp)
481                         continue;
482
483                 --lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count;
484                 obd_disconnect(tgt->ltd_exp);
485         }
486
487         goto unlock;
488 }
489
490 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
491 {
492         struct lmv_obd *lmv = &obd->u.lmv;
493         struct obd_device *mdc_obd;
494         int rc;
495         ENTRY;
496
497         LASSERT(tgt != NULL);
498         LASSERT(obd != NULL);
499
500         mdc_obd = class_exp2obd(tgt->ltd_exp);
501
502         if (mdc_obd) {
503                 mdc_obd->obd_force = obd->obd_force;
504                 mdc_obd->obd_fail = obd->obd_fail;
505                 mdc_obd->obd_no_recov = obd->obd_no_recov;
506
507                 if (lmv->lmv_tgts_kobj)
508                         sysfs_remove_link(lmv->lmv_tgts_kobj,
509                                           mdc_obd->obd_name);
510         }
511
512         rc = lu_qos_del_tgt(&lmv->lmv_qos, tgt);
513         if (rc)
514                 CERROR("%s: Can't del target from QoS table: rc = %d\n",
515                        tgt->ltd_exp->exp_obd->obd_name, rc);
516
517         rc = fld_client_del_target(&lmv->lmv_fld, tgt->ltd_index);
518         if (rc)
519                 CERROR("%s: Can't del fld targets: rc = %d\n",
520                        tgt->ltd_exp->exp_obd->obd_name, rc);
521
522         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
523         if (rc)
524                 CERROR("%s: Can't finalize fids factory: rc = %d\n",
525                        tgt->ltd_exp->exp_obd->obd_name, rc);
526
527         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
528                tgt->ltd_exp->exp_obd->obd_name,
529                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
530
531         lmv_activate_target(lmv, tgt, 0);
532         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
533         rc = obd_disconnect(tgt->ltd_exp);
534         if (rc) {
535                 CERROR("%s: Target %s disconnect error: rc = %d\n",
536                        tgt->ltd_exp->exp_obd->obd_name,
537                        tgt->ltd_uuid.uuid, rc);
538         }
539         tgt->ltd_exp = NULL;
540         RETURN(0);
541 }
542
543 static int lmv_disconnect(struct obd_export *exp)
544 {
545         struct obd_device *obd = class_exp2obd(exp);
546         struct lmv_obd *lmv = &obd->u.lmv;
547         struct lmv_tgt_desc *tgt;
548         int rc;
549
550         ENTRY;
551
552         lmv_foreach_connected_tgt(lmv, tgt)
553                 lmv_disconnect_mdc(obd, tgt);
554
555         if (lmv->lmv_tgts_kobj)
556                 kobject_put(lmv->lmv_tgts_kobj);
557
558         lmv->connected = 0;
559         rc = class_disconnect(exp);
560
561         RETURN(rc);
562 }
563
564 static int lmv_fid2path(struct obd_export *exp, int len, void *karg,
565                         void __user *uarg)
566 {
567         struct obd_device *obd = class_exp2obd(exp);
568         struct lmv_obd *lmv = &obd->u.lmv;
569         struct getinfo_fid2path *gf;
570         struct lmv_tgt_desc *tgt;
571         struct getinfo_fid2path *remote_gf = NULL;
572         struct lu_fid root_fid;
573         int remote_gf_size = 0;
574         int currentisenc = 0;
575         int globalisenc = 0;
576         int rc;
577
578         gf = karg;
579         tgt = lmv_fid2tgt(lmv, &gf->gf_fid);
580         if (IS_ERR(tgt))
581                 RETURN(PTR_ERR(tgt));
582
583         root_fid = *gf->gf_u.gf_root_fid;
584         LASSERT(fid_is_sane(&root_fid));
585
586 repeat_fid2path:
587         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
588         if (rc != 0 && rc != -EREMOTE)
589                 GOTO(out_fid2path, rc);
590
591         if (gf->gf_u.gf_path[0] == '/') {
592                 /* by convention, server side (mdt_path_current()) puts
593                  * a leading '/' to tell client that we are dealing with
594                  * an encrypted file
595                  */
596                 currentisenc = 1;
597                 globalisenc = 1;
598         } else {
599                 currentisenc = 0;
600         }
601
602         /* If remote_gf != NULL, it means just building the
603          * path on the remote MDT, copy this path segment to gf.
604          */
605         if (remote_gf != NULL) {
606                 struct getinfo_fid2path *ori_gf;
607                 int oldisenc = 0;
608                 char *ptr;
609                 int len;
610
611                 ori_gf = (struct getinfo_fid2path *)karg;
612                 if (strlen(ori_gf->gf_u.gf_path) + 1 +
613                     strlen(gf->gf_u.gf_path) + 1 > ori_gf->gf_pathlen)
614                         GOTO(out_fid2path, rc = -EOVERFLOW);
615
616                 ptr = ori_gf->gf_u.gf_path;
617                 oldisenc = ptr[0] == '/';
618
619                 len = strlen(gf->gf_u.gf_path);
620                 if (len) {
621                         /* move the current path to the right to release space
622                          * for closer-to-root part
623                          */
624                         memmove(ptr + len - currentisenc + 1 + globalisenc,
625                                 ptr + oldisenc,
626                                 strlen(ori_gf->gf_u.gf_path) - oldisenc + 1);
627                         if (globalisenc)
628                                 *(ptr++) = '/';
629                         memcpy(ptr, gf->gf_u.gf_path + currentisenc,
630                                len - currentisenc);
631                         ptr[len - currentisenc] = '/';
632                 }
633         }
634
635         CDEBUG(D_INFO, "%s: get path %s "DFID" rec: %llu ln: %u\n",
636                tgt->ltd_exp->exp_obd->obd_name,
637                gf->gf_u.gf_path, PFID(&gf->gf_fid), gf->gf_recno,
638                gf->gf_linkno);
639
640         if (rc == 0)
641                 GOTO(out_fid2path, rc);
642
643         /* sigh, has to go to another MDT to do path building further */
644         if (remote_gf == NULL) {
645                 remote_gf_size = sizeof(*remote_gf) + len - sizeof(*gf);
646                 OBD_ALLOC(remote_gf, remote_gf_size);
647                 if (remote_gf == NULL)
648                         GOTO(out_fid2path, rc = -ENOMEM);
649                 remote_gf->gf_pathlen = len - sizeof(*gf);
650         }
651
652         if (!fid_is_sane(&gf->gf_fid)) {
653                 CERROR("%s: invalid FID "DFID": rc = %d\n",
654                        tgt->ltd_exp->exp_obd->obd_name,
655                        PFID(&gf->gf_fid), -EINVAL);
656                 GOTO(out_fid2path, rc = -EINVAL);
657         }
658
659         tgt = lmv_fid2tgt(lmv, &gf->gf_fid);
660         if (IS_ERR(tgt))
661                 GOTO(out_fid2path, rc = -EINVAL);
662
663         remote_gf->gf_fid = gf->gf_fid;
664         remote_gf->gf_recno = -1;
665         remote_gf->gf_linkno = -1;
666         memset(remote_gf->gf_u.gf_path, 0, remote_gf->gf_pathlen);
667         *remote_gf->gf_u.gf_root_fid = root_fid;
668         gf = remote_gf;
669         goto repeat_fid2path;
670
671 out_fid2path:
672         if (remote_gf != NULL)
673                 OBD_FREE(remote_gf, remote_gf_size);
674         RETURN(rc);
675 }
676
677 static int lmv_hsm_req_count(struct lmv_obd *lmv,
678                              const struct hsm_user_request *hur,
679                              const struct lmv_tgt_desc *tgt_mds)
680 {
681         struct lmv_tgt_desc *curr_tgt;
682         __u32 i;
683         int nr = 0;
684
685         /* count how many requests must be sent to the given target */
686         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
687                 curr_tgt = lmv_fid2tgt(lmv, &hur->hur_user_item[i].hui_fid);
688                 if (IS_ERR(curr_tgt))
689                         RETURN(PTR_ERR(curr_tgt));
690                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
691                         nr++;
692         }
693         return nr;
694 }
695
696 static int lmv_hsm_req_build(struct lmv_obd *lmv,
697                               struct hsm_user_request *hur_in,
698                               const struct lmv_tgt_desc *tgt_mds,
699                               struct hsm_user_request *hur_out)
700 {
701         __u32 i, nr_out;
702         struct lmv_tgt_desc *curr_tgt;
703
704         /* build the hsm_user_request for the given target */
705         hur_out->hur_request = hur_in->hur_request;
706         nr_out = 0;
707         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
708                 curr_tgt = lmv_fid2tgt(lmv, &hur_in->hur_user_item[i].hui_fid);
709                 if (IS_ERR(curr_tgt))
710                         RETURN(PTR_ERR(curr_tgt));
711                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
712                         hur_out->hur_user_item[nr_out] =
713                                                 hur_in->hur_user_item[i];
714                         nr_out++;
715                 }
716         }
717         hur_out->hur_request.hr_itemcount = nr_out;
718         memcpy(hur_data(hur_out), hur_data(hur_in),
719                hur_in->hur_request.hr_data_len);
720
721         RETURN(0);
722 }
723
724 static int lmv_hsm_ct_unregister(struct obd_device *obd, unsigned int cmd,
725                                  int len, struct lustre_kernelcomm *lk,
726                                  void __user *uarg)
727 {
728         struct lmv_obd *lmv = &obd->u.lmv;
729         struct lu_tgt_desc *tgt;
730         int rc;
731
732         ENTRY;
733
734         /* unregister request (call from llapi_hsm_copytool_fini) */
735         lmv_foreach_connected_tgt(lmv, tgt)
736                 /* best effort: try to clean as much as possible
737                  * (continue on error) */
738                 obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
739
740         /* Whatever the result, remove copytool from kuc groups.
741          * Unreached coordinators will get EPIPE on next requests
742          * and will unregister automatically.
743          */
744         rc = libcfs_kkuc_group_rem(&obd->obd_uuid, lk->lk_uid, lk->lk_group);
745
746         RETURN(rc);
747 }
748
749 static int lmv_hsm_ct_register(struct obd_device *obd, unsigned int cmd,
750                                int len, struct lustre_kernelcomm *lk,
751                                void __user *uarg)
752 {
753         struct lmv_obd *lmv = &obd->u.lmv;
754         struct file *filp;
755         bool any_set = false;
756         struct kkuc_ct_data *kcd;
757         size_t kcd_size;
758         struct lu_tgt_desc *tgt;
759         __u32 i;
760         int err;
761         int rc = 0;
762
763         ENTRY;
764
765         filp = fget(lk->lk_wfd);
766         if (!filp)
767                 RETURN(-EBADF);
768
769         if (lk->lk_flags & LK_FLG_DATANR)
770                 kcd_size = offsetof(struct kkuc_ct_data,
771                                     kcd_archives[lk->lk_data_count]);
772         else
773                 kcd_size = sizeof(*kcd);
774
775         OBD_ALLOC(kcd, kcd_size);
776         if (kcd == NULL)
777                 GOTO(err_fput, rc = -ENOMEM);
778
779         kcd->kcd_nr_archives = lk->lk_data_count;
780         if (lk->lk_flags & LK_FLG_DATANR) {
781                 kcd->kcd_magic = KKUC_CT_DATA_ARRAY_MAGIC;
782                 if (lk->lk_data_count > 0)
783                         memcpy(kcd->kcd_archives, lk->lk_data,
784                                sizeof(*kcd->kcd_archives) * lk->lk_data_count);
785         } else {
786                 kcd->kcd_magic = KKUC_CT_DATA_BITMAP_MAGIC;
787         }
788
789         rc = libcfs_kkuc_group_add(filp, &obd->obd_uuid, lk->lk_uid,
790                                    lk->lk_group, kcd, kcd_size);
791         OBD_FREE(kcd, kcd_size);
792         if (rc)
793                 GOTO(err_fput, rc);
794
795         /* All or nothing: try to register to all MDS.
796          * In case of failure, unregister from previous MDS,
797          * except if it because of inactive target. */
798         lmv_foreach_connected_tgt(lmv, tgt) {
799                 err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
800                 if (err) {
801                         if (tgt->ltd_active) {
802                                 /* permanent error */
803                                 CERROR("%s: iocontrol MDC %s on MDT"
804                                        " idx %d cmd %x: err = %d\n",
805                                        lmv2obd_dev(lmv)->obd_name,
806                                        tgt->ltd_uuid.uuid, tgt->ltd_index, cmd,
807                                        err);
808                                 rc = err;
809                                 lk->lk_flags |= LK_FLG_STOP;
810                                 i = tgt->ltd_index;
811                                 /* unregister from previous MDS */
812                                 lmv_foreach_connected_tgt(lmv, tgt) {
813                                         if (tgt->ltd_index >= i)
814                                                 break;
815
816                                         obd_iocontrol(cmd, tgt->ltd_exp, len,
817                                                       lk, uarg);
818                                 }
819                                 GOTO(err_kkuc_rem, rc);
820                         }
821                         /* else: transient error.
822                          * kuc will register to the missing MDT
823                          * when it is back */
824                 } else {
825                         any_set = true;
826                 }
827         }
828
829         if (!any_set)
830                 /* no registration done: return error */
831                 GOTO(err_kkuc_rem, rc = -ENOTCONN);
832
833         RETURN(0);
834
835 err_kkuc_rem:
836         libcfs_kkuc_group_rem(&obd->obd_uuid, lk->lk_uid, lk->lk_group);
837
838 err_fput:
839         fput(filp);
840         return rc;
841 }
842
843 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
844                          int len, void *karg, void __user *uarg)
845 {
846         struct obd_device *obd = class_exp2obd(exp);
847         struct lmv_obd *lmv = &obd->u.lmv;
848         struct lu_tgt_desc *tgt = NULL;
849         int set = 0;
850         __u32 count = lmv->lmv_mdt_count;
851         int rc = 0;
852
853         ENTRY;
854
855         if (count == 0)
856                 RETURN(-ENOTTY);
857
858         switch (cmd) {
859         case IOC_OBD_STATFS: {
860                 struct obd_ioctl_data *data = karg;
861                 struct obd_device *mdc_obd;
862                 struct obd_statfs stat_buf = {0};
863                 __u32 index;
864
865                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
866
867                 if (index >= lmv->lmv_mdt_descs.ltd_tgts_size)
868                         RETURN(-ENODEV);
869
870                 tgt = lmv_tgt(lmv, index);
871                 if (!tgt)
872                         RETURN(-EAGAIN);
873
874                 if (!tgt->ltd_active)
875                         RETURN(-ENODATA);
876
877                 mdc_obd = class_exp2obd(tgt->ltd_exp);
878                 if (!mdc_obd)
879                         RETURN(-EINVAL);
880
881                 /* copy UUID */
882                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
883                                  min((int) data->ioc_plen2,
884                                      (int) sizeof(struct obd_uuid))))
885                         RETURN(-EFAULT);
886
887                 rc = obd_statfs(NULL, tgt->ltd_exp, &stat_buf,
888                                 ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS,
889                                 0);
890                 if (rc)
891                         RETURN(rc);
892                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
893                                  min((int) data->ioc_plen1,
894                                      (int) sizeof(stat_buf))))
895                         RETURN(-EFAULT);
896                 break;
897         }
898         case OBD_IOC_QUOTACTL: {
899                 struct if_quotactl *qctl = karg;
900                 struct obd_quotactl *oqctl;
901                 struct obd_import *imp;
902
903                 if (qctl->qc_valid == QC_MDTIDX) {
904                         tgt = lmv_tgt(lmv, qctl->qc_idx);
905                 } else if (qctl->qc_valid == QC_UUID) {
906                         lmv_foreach_tgt(lmv, tgt) {
907                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
908                                                      &qctl->obd_uuid))
909                                         continue;
910
911                                 if (!tgt->ltd_exp)
912                                         RETURN(-EINVAL);
913
914                                 break;
915                         }
916                 } else {
917                         RETURN(-EINVAL);
918                 }
919
920                 if (!tgt)
921                         RETURN(-ENODEV);
922
923                 if (!tgt->ltd_exp)
924                         RETURN(-EINVAL);
925
926                 imp = class_exp2cliimp(tgt->ltd_exp);
927                 if (!tgt->ltd_active && imp->imp_state != LUSTRE_IMP_IDLE) {
928                         qctl->qc_valid = QC_MDTIDX;
929                         qctl->obd_uuid = tgt->ltd_uuid;
930                         RETURN(-ENODATA);
931                 }
932
933                 OBD_ALLOC_PTR(oqctl);
934                 if (!oqctl)
935                         RETURN(-ENOMEM);
936
937                 QCTL_COPY(oqctl, qctl);
938                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
939                 if (rc == 0) {
940                         QCTL_COPY(qctl, oqctl);
941                         qctl->qc_valid = QC_MDTIDX;
942                         qctl->obd_uuid = tgt->ltd_uuid;
943                 }
944                 OBD_FREE_PTR(oqctl);
945                 break;
946         }
947         case LL_IOC_GET_CONNECT_FLAGS: {
948                 tgt = lmv_tgt(lmv, 0);
949                 rc = -ENODATA;
950                 if (tgt && tgt->ltd_exp)
951                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
952                 break;
953         }
954         case LL_IOC_FID2MDTIDX: {
955                 struct lu_fid *fid = karg;
956                 int             mdt_index;
957
958                 rc = lmv_fld_lookup(lmv, fid, &mdt_index);
959                 if (rc != 0)
960                         RETURN(rc);
961
962                 /* Note: this is from llite(see ll_dir_ioctl()), @uarg does not
963                  * point to user space memory for FID2MDTIDX. */
964                 *(__u32 *)uarg = mdt_index;
965                 break;
966         }
967         case OBD_IOC_FID2PATH: {
968                 rc = lmv_fid2path(exp, len, karg, uarg);
969                 break;
970         }
971         case LL_IOC_HSM_STATE_GET:
972         case LL_IOC_HSM_STATE_SET:
973         case LL_IOC_HSM_ACTION: {
974                 struct md_op_data *op_data = karg;
975
976                 tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
977                 if (IS_ERR(tgt))
978                         RETURN(PTR_ERR(tgt));
979
980                 if (tgt->ltd_exp == NULL)
981                         RETURN(-EINVAL);
982
983                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
984                 break;
985         }
986         case LL_IOC_HSM_PROGRESS: {
987                 const struct hsm_progress_kernel *hpk = karg;
988
989                 tgt = lmv_fid2tgt(lmv, &hpk->hpk_fid);
990                 if (IS_ERR(tgt))
991                         RETURN(PTR_ERR(tgt));
992                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
993                 break;
994         }
995         case LL_IOC_HSM_REQUEST: {
996                 struct hsm_user_request *hur = karg;
997                 unsigned int reqcount = hur->hur_request.hr_itemcount;
998
999                 if (reqcount == 0)
1000                         RETURN(0);
1001
1002                 /* if the request is about a single fid
1003                  * or if there is a single MDS, no need to split
1004                  * the request. */
1005                 if (reqcount == 1 || count == 1) {
1006                         tgt = lmv_fid2tgt(lmv, &hur->hur_user_item[0].hui_fid);
1007                         if (IS_ERR(tgt))
1008                                 RETURN(PTR_ERR(tgt));
1009                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1010                 } else {
1011                         /* split fid list to their respective MDS */
1012                         lmv_foreach_connected_tgt(lmv, tgt) {
1013                                 int nr, rc1;
1014                                 size_t reqlen;
1015                                 struct hsm_user_request *req;
1016
1017                                 nr = lmv_hsm_req_count(lmv, hur, tgt);
1018                                 if (nr < 0)
1019                                         RETURN(nr);
1020                                 if (nr == 0) /* nothing for this MDS */
1021                                         continue;
1022
1023                                 /* build a request with fids for this MDS */
1024                                 reqlen = offsetof(typeof(*hur),
1025                                                   hur_user_item[nr])
1026                                                 + hur->hur_request.hr_data_len;
1027                                 OBD_ALLOC_LARGE(req, reqlen);
1028                                 if (req == NULL)
1029                                         RETURN(-ENOMEM);
1030                                 rc1 = lmv_hsm_req_build(lmv, hur, tgt, req);
1031                                 if (rc1 < 0)
1032                                         GOTO(hsm_req_err, rc1);
1033                                 rc1 = obd_iocontrol(cmd, tgt->ltd_exp, reqlen,
1034                                                     req, uarg);
1035 hsm_req_err:
1036                                 if (rc1 != 0 && rc == 0)
1037                                         rc = rc1;
1038                                 OBD_FREE_LARGE(req, reqlen);
1039                         }
1040                 }
1041                 break;
1042         }
1043         case LL_IOC_LOV_SWAP_LAYOUTS: {
1044                 struct md_op_data *op_data = karg;
1045                 struct lmv_tgt_desc *tgt1, *tgt2;
1046
1047                 tgt1 = lmv_fid2tgt(lmv, &op_data->op_fid1);
1048                 if (IS_ERR(tgt1))
1049                         RETURN(PTR_ERR(tgt1));
1050
1051                 tgt2 = lmv_fid2tgt(lmv, &op_data->op_fid2);
1052                 if (IS_ERR(tgt2))
1053                         RETURN(PTR_ERR(tgt2));
1054
1055                 if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1056                         RETURN(-EINVAL);
1057
1058                 /* only files on same MDT can have their layouts swapped */
1059                 if (tgt1->ltd_index != tgt2->ltd_index)
1060                         RETURN(-EPERM);
1061
1062                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1063                 break;
1064         }
1065         case LL_IOC_HSM_CT_START: {
1066                 struct lustre_kernelcomm *lk = karg;
1067                 if (lk->lk_flags & LK_FLG_STOP)
1068                         rc = lmv_hsm_ct_unregister(obd, cmd, len, lk, uarg);
1069                 else
1070                         rc = lmv_hsm_ct_register(obd, cmd, len, lk, uarg);
1071                 break;
1072         }
1073         default:
1074                 lmv_foreach_connected_tgt(lmv, tgt) {
1075                         struct obd_device *mdc_obd;
1076                         int err;
1077
1078                         /* ll_umount_begin() sets force flag but for lmv, not
1079                          * mdc. Let's pass it through */
1080                         mdc_obd = class_exp2obd(tgt->ltd_exp);
1081                         mdc_obd->obd_force = obd->obd_force;
1082                         err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1083                         if (err) {
1084                                 if (tgt->ltd_active) {
1085                                         CERROR("error: iocontrol MDC %s on MDT"
1086                                                " idx %d cmd %x: err = %d\n",
1087                                                tgt->ltd_uuid.uuid,
1088                                                tgt->ltd_index, cmd, err);
1089                                         if (!rc)
1090                                                 rc = err;
1091                                 }
1092                         } else
1093                                 set = 1;
1094                 }
1095                 if (!set && !rc)
1096                         rc = -EIO;
1097         }
1098         RETURN(rc);
1099 }
1100
1101 int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp,
1102                   struct lu_fid *fid, struct md_op_data *op_data)
1103 {
1104         struct obd_device *obd = class_exp2obd(exp);
1105         struct lmv_obd *lmv = &obd->u.lmv;
1106         struct lmv_tgt_desc *tgt;
1107         int rc;
1108
1109         ENTRY;
1110
1111         LASSERT(op_data);
1112         LASSERT(fid);
1113
1114         tgt = lmv_tgt(lmv, op_data->op_mds);
1115         if (!tgt)
1116                 RETURN(-ENODEV);
1117
1118         if (!tgt->ltd_active || !tgt->ltd_exp)
1119                 RETURN(-ENODEV);
1120
1121         /*
1122          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1123          * on server that seq in new allocated fid is not yet known.
1124          */
1125         mutex_lock(&tgt->ltd_fid_mutex);
1126         rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL);
1127         mutex_unlock(&tgt->ltd_fid_mutex);
1128         if (rc > 0) {
1129                 LASSERT(fid_is_sane(fid));
1130                 rc = 0;
1131         }
1132
1133         RETURN(rc);
1134 }
1135
1136 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1137 {
1138         struct lmv_obd *lmv = &obd->u.lmv;
1139         struct lmv_desc *desc;
1140         struct lnet_processid lnet_id;
1141         int i = 0;
1142         int rc;
1143
1144         ENTRY;
1145
1146         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1147                 CERROR("LMV setup requires a descriptor\n");
1148                 RETURN(-EINVAL);
1149         }
1150
1151         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1152         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1153                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1154                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1155                 RETURN(-EINVAL);
1156         }
1157
1158         obd_str2uuid(&lmv->lmv_mdt_descs.ltd_lmv_desc.ld_uuid,
1159                      desc->ld_uuid.uuid);
1160         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_tgt_count = 0;
1161         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count = 0;
1162         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_qos_maxage =
1163                 LMV_DESC_QOS_MAXAGE_DEFAULT;
1164         lmv->max_def_easize = 0;
1165         lmv->max_easize = 0;
1166
1167         spin_lock_init(&lmv->lmv_lock);
1168
1169         /*
1170          * initialize rr_index to lower 32bit of netid, so that client
1171          * can distribute subdirs evenly from the beginning.
1172          */
1173         while (LNetGetId(i++, &lnet_id) != -ENOENT) {
1174                 if (!nid_is_lo0(&lnet_id.nid)) {
1175                         lmv->lmv_qos_rr_index = ntohl(lnet_id.nid.nid_addr[0]);
1176                         break;
1177                 }
1178         }
1179
1180         rc = lmv_tunables_init(obd);
1181         if (rc)
1182                 CWARN("%s: error adding LMV sysfs/debugfs files: rc = %d\n",
1183                       obd->obd_name, rc);
1184
1185         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1186                              LUSTRE_CLI_FLD_HASH_DHT);
1187         if (rc)
1188                 CERROR("Can't init FLD, err %d\n", rc);
1189
1190         rc = lu_tgt_descs_init(&lmv->lmv_mdt_descs, true);
1191         if (rc)
1192                 CWARN("%s: error initialize target table: rc = %d\n",
1193                       obd->obd_name, rc);
1194
1195         RETURN(rc);
1196 }
1197
1198 static int lmv_cleanup(struct obd_device *obd)
1199 {
1200         struct lmv_obd *lmv = &obd->u.lmv;
1201         struct lu_tgt_desc *tgt;
1202         struct lu_tgt_desc *tmp;
1203
1204         ENTRY;
1205
1206         fld_client_fini(&lmv->lmv_fld);
1207         fld_client_debugfs_fini(&lmv->lmv_fld);
1208
1209         lprocfs_obd_cleanup(obd);
1210         lprocfs_free_md_stats(obd);
1211
1212         lmv_foreach_tgt_safe(lmv, tgt, tmp)
1213                 lmv_del_target(lmv, tgt);
1214         lu_tgt_descs_fini(&lmv->lmv_mdt_descs);
1215
1216         RETURN(0);
1217 }
1218
1219 static int lmv_process_config(struct obd_device *obd, size_t len, void *buf)
1220 {
1221         struct lustre_cfg       *lcfg = buf;
1222         struct obd_uuid         obd_uuid;
1223         int                     gen;
1224         __u32                   index;
1225         int                     rc;
1226         ENTRY;
1227
1228         switch (lcfg->lcfg_command) {
1229         case LCFG_ADD_MDC:
1230                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1231                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1232                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
1233                         GOTO(out, rc = -EINVAL);
1234
1235                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1236
1237                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%u", &index) != 1)
1238                         GOTO(out, rc = -EINVAL);
1239                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
1240                         GOTO(out, rc = -EINVAL);
1241                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1242                 GOTO(out, rc);
1243         default:
1244                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1245                 GOTO(out, rc = -EINVAL);
1246         }
1247 out:
1248         RETURN(rc);
1249 }
1250
1251 static int lmv_select_statfs_mdt(struct lmv_obd *lmv, __u32 flags)
1252 {
1253         int i;
1254
1255         if (flags & OBD_STATFS_FOR_MDT0)
1256                 return 0;
1257
1258         if (lmv->lmv_statfs_start || lmv->lmv_mdt_count == 1)
1259                 return lmv->lmv_statfs_start;
1260
1261         /* choose initial MDT for this client */
1262         for (i = 0;; i++) {
1263                 struct lnet_processid lnet_id;
1264                 if (LNetGetId(i, &lnet_id) == -ENOENT)
1265                         break;
1266
1267                 if (!nid_is_lo0(&lnet_id.nid)) {
1268                         /* We dont need a full 64-bit modulus, just enough
1269                          * to distribute the requests across MDTs evenly.
1270                          */
1271                         lmv->lmv_statfs_start = nidhash(&lnet_id.nid) %
1272                                                 lmv->lmv_mdt_count;
1273                         break;
1274                 }
1275         }
1276
1277         return lmv->lmv_statfs_start;
1278 }
1279
1280 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1281                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
1282 {
1283         struct obd_device *obd = class_exp2obd(exp);
1284         struct lmv_obd *lmv = &obd->u.lmv;
1285         struct obd_statfs *temp;
1286         struct lu_tgt_desc *tgt;
1287         __u32 i;
1288         __u32 idx;
1289         int rc = 0;
1290         int err = 0;
1291
1292         ENTRY;
1293
1294         OBD_ALLOC(temp, sizeof(*temp));
1295         if (temp == NULL)
1296                 RETURN(-ENOMEM);
1297
1298         /* distribute statfs among MDTs */
1299         idx = lmv_select_statfs_mdt(lmv, flags);
1300
1301         for (i = 0; i < lmv->lmv_mdt_descs.ltd_tgts_size; i++, idx++) {
1302                 idx = idx % lmv->lmv_mdt_descs.ltd_tgts_size;
1303                 tgt = lmv_tgt(lmv, idx);
1304                 if (!tgt || !tgt->ltd_exp)
1305                         continue;
1306
1307                 rc = obd_statfs(env, tgt->ltd_exp, temp, max_age,
1308                                 flags | OBD_STATFS_NESTED);
1309                 if (rc) {
1310                         CERROR("%s: can't stat MDS #%d: rc = %d\n",
1311                                tgt->ltd_exp->exp_obd->obd_name, i, rc);
1312                         err = rc;
1313                         /* Try another MDT */
1314                         if (flags & OBD_STATFS_SUM)
1315                                 continue;
1316                         GOTO(out_free_temp, rc);
1317                 }
1318
1319                 if (temp->os_state & OS_STATFS_SUM ||
1320                     flags == OBD_STATFS_FOR_MDT0) {
1321                         /* reset to the last aggregated values
1322                          * and don't sum with non-aggrated data */
1323                         /* If the statfs is from mount, it needs to retrieve
1324                          * necessary information from MDT0. i.e. mount does
1325                          * not need the merged osfs from all of MDT. Also
1326                          * clients can be mounted as long as MDT0 is in
1327                          * service */
1328                         *osfs = *temp;
1329                         GOTO(out_free_temp, rc);
1330                 }
1331
1332                 if (i == 0) {
1333                         *osfs = *temp;
1334                 } else {
1335                         osfs->os_bavail += temp->os_bavail;
1336                         osfs->os_blocks += temp->os_blocks;
1337                         osfs->os_ffree += temp->os_ffree;
1338                         osfs->os_files += temp->os_files;
1339                         osfs->os_granted += temp->os_granted;
1340                 }
1341         }
1342         /* There is no stats from some MDTs, data incomplete */
1343         if (err)
1344                 rc = err;
1345 out_free_temp:
1346         OBD_FREE(temp, sizeof(*temp));
1347         RETURN(rc);
1348 }
1349
1350 static int lmv_statfs_update(void *cookie, int rc)
1351 {
1352         struct obd_info *oinfo = cookie;
1353         struct obd_device *obd = oinfo->oi_obd;
1354         struct lmv_obd *lmv = &obd->u.lmv;
1355         struct lmv_tgt_desc *tgt = oinfo->oi_tgt;
1356         struct obd_statfs *osfs = oinfo->oi_osfs;
1357
1358         /*
1359          * NB: don't deactivate TGT upon error, because we may not trigger async
1360          * statfs any longer, then there is no chance to activate TGT.
1361          */
1362         if (!rc) {
1363                 spin_lock(&lmv->lmv_lock);
1364                 tgt->ltd_statfs = *osfs;
1365                 tgt->ltd_statfs_age = ktime_get_seconds();
1366                 spin_unlock(&lmv->lmv_lock);
1367                 set_bit(LQ_DIRTY, &lmv->lmv_qos.lq_flags);
1368         }
1369
1370         return rc;
1371 }
1372
1373 /* update tgt statfs async if it's ld_qos_maxage old */
1374 int lmv_statfs_check_update(struct obd_device *obd, struct lmv_tgt_desc *tgt)
1375 {
1376         struct obd_info oinfo = {
1377                 .oi_obd = obd,
1378                 .oi_tgt = tgt,
1379                 .oi_cb_up = lmv_statfs_update,
1380         };
1381         int rc;
1382
1383         if (ktime_get_seconds() - tgt->ltd_statfs_age <
1384             obd->u.lmv.lmv_mdt_descs.ltd_lmv_desc.ld_qos_maxage)
1385                 return 0;
1386
1387         rc = obd_statfs_async(tgt->ltd_exp, &oinfo, 0, NULL);
1388
1389         return rc;
1390 }
1391
1392 static int lmv_get_root(struct obd_export *exp, const char *fileset,
1393                         struct lu_fid *fid)
1394 {
1395         struct obd_device *obd = exp->exp_obd;
1396         struct lmv_obd *lmv = &obd->u.lmv;
1397         struct lu_tgt_desc *tgt = lmv_tgt(lmv, 0);
1398         int rc;
1399
1400         ENTRY;
1401
1402         if (!tgt)
1403                 RETURN(-ENODEV);
1404
1405         rc = md_get_root(tgt->ltd_exp, fileset, fid);
1406         RETURN(rc);
1407 }
1408
1409 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1410                         u64 obd_md_valid, const char *name, size_t buf_size,
1411                         struct ptlrpc_request **req)
1412 {
1413         struct obd_device *obd = exp->exp_obd;
1414         struct lmv_obd *lmv = &obd->u.lmv;
1415         struct lmv_tgt_desc *tgt;
1416         int rc;
1417
1418         ENTRY;
1419
1420         tgt = lmv_fid2tgt(lmv, fid);
1421         if (IS_ERR(tgt))
1422                 RETURN(PTR_ERR(tgt));
1423
1424         rc = md_getxattr(tgt->ltd_exp, fid, obd_md_valid, name, buf_size, req);
1425
1426         RETURN(rc);
1427 }
1428
1429 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1430                         u64 obd_md_valid, const char *name,
1431                         const void *value, size_t value_size,
1432                         unsigned int xattr_flags, u32 suppgid,
1433                         struct ptlrpc_request **req)
1434 {
1435         struct obd_device *obd = exp->exp_obd;
1436         struct lmv_obd *lmv = &obd->u.lmv;
1437         struct lmv_tgt_desc *tgt;
1438         int rc;
1439
1440         ENTRY;
1441
1442         tgt = lmv_fid2tgt(lmv, fid);
1443         if (IS_ERR(tgt))
1444                 RETURN(PTR_ERR(tgt));
1445
1446         rc = md_setxattr(tgt->ltd_exp, fid, obd_md_valid, name,
1447                          value, value_size, xattr_flags, suppgid, req);
1448
1449         RETURN(rc);
1450 }
1451
1452 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1453                        struct ptlrpc_request **request)
1454 {
1455         struct obd_device *obd = exp->exp_obd;
1456         struct lmv_obd *lmv = &obd->u.lmv;
1457         struct lmv_tgt_desc *tgt;
1458         int rc;
1459
1460         ENTRY;
1461
1462         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
1463         if (IS_ERR(tgt))
1464                 RETURN(PTR_ERR(tgt));
1465
1466         if (op_data->op_flags & MF_GET_MDT_IDX) {
1467                 op_data->op_mds = tgt->ltd_index;
1468                 RETURN(0);
1469         }
1470
1471         rc = md_getattr(tgt->ltd_exp, op_data, request);
1472
1473         RETURN(rc);
1474 }
1475
1476 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1477 {
1478         struct obd_device *obd = exp->exp_obd;
1479         struct lmv_obd *lmv = &obd->u.lmv;
1480         struct lu_tgt_desc *tgt;
1481
1482         ENTRY;
1483
1484         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1485
1486         /*
1487          * With DNE every object can have two locks in different namespaces:
1488          * lookup lock in space of MDT storing direntry and update/open lock in
1489          * space of MDT storing inode.
1490          */
1491         lmv_foreach_connected_tgt(lmv, tgt)
1492                 md_null_inode(tgt->ltd_exp, fid);
1493
1494         RETURN(0);
1495 }
1496
1497 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1498                      struct md_open_data *mod, struct ptlrpc_request **request)
1499 {
1500         struct obd_device *obd = exp->exp_obd;
1501         struct lmv_obd *lmv = &obd->u.lmv;
1502         struct lmv_tgt_desc *tgt;
1503         int rc;
1504
1505         ENTRY;
1506
1507         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
1508         if (IS_ERR(tgt))
1509                 RETURN(PTR_ERR(tgt));
1510
1511         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1512         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1513         RETURN(rc);
1514 }
1515
1516 static struct lu_tgt_desc *lmv_locate_tgt_qos(struct lmv_obd *lmv,
1517                                               struct md_op_data *op_data)
1518 {
1519         struct lu_tgt_desc *tgt, *cur = NULL;
1520         __u64 total_avail = 0;
1521         __u64 total_weight = 0;
1522         __u64 cur_weight = 0;
1523         int total_usable = 0;
1524         __u64 rand;
1525         int rc;
1526
1527         ENTRY;
1528
1529         if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs))
1530                 RETURN(ERR_PTR(-EAGAIN));
1531
1532         down_write(&lmv->lmv_qos.lq_rw_sem);
1533
1534         if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs))
1535                 GOTO(unlock, tgt = ERR_PTR(-EAGAIN));
1536
1537         rc = ltd_qos_penalties_calc(&lmv->lmv_mdt_descs);
1538         if (rc)
1539                 GOTO(unlock, tgt = ERR_PTR(rc));
1540
1541         lmv_foreach_tgt(lmv, tgt) {
1542                 if (!tgt->ltd_exp || !tgt->ltd_active) {
1543                         tgt->ltd_qos.ltq_usable = 0;
1544                         continue;
1545                 }
1546
1547                 tgt->ltd_qos.ltq_usable = 1;
1548                 lu_tgt_qos_weight_calc(tgt, true);
1549                 if (tgt->ltd_index == op_data->op_mds)
1550                         cur = tgt;
1551                 total_avail += tgt->ltd_qos.ltq_avail;
1552                 total_weight += tgt->ltd_qos.ltq_weight;
1553                 total_usable++;
1554         }
1555
1556         /* If current MDT has above-average space and dir is not aleady using
1557          * round-robin to spread across more MDTs, stay on the parent MDT
1558          * to avoid creating needless remote MDT directories.  Remote dirs
1559          * close to the root balance space more effectively than bottom dirs,
1560          * so prefer to create remote dirs at top level of directory tree.
1561          * "16 / (dir_depth + 10)" is the factor to make it less likely
1562          * for top-level directories to stay local unless they have more than
1563          * average free space, while deep dirs prefer local until more full.
1564          *    depth=0 -> 160%, depth=3 -> 123%, depth=6 -> 100%,
1565          *    depth=9 -> 84%, depth=12 -> 73%, depth=15 -> 64%
1566          */
1567         if (!lmv_op_default_rr_mkdir(op_data)) {
1568                 rand = total_avail * 16 /
1569                         (total_usable * (op_data->op_dir_depth + 10));
1570                 if (cur && cur->ltd_qos.ltq_avail >= rand) {
1571                         tgt = cur;
1572                         GOTO(unlock, tgt);
1573                 }
1574         }
1575
1576         rand = lu_prandom_u64_max(total_weight);
1577
1578         lmv_foreach_connected_tgt(lmv, tgt) {
1579                 if (!tgt->ltd_qos.ltq_usable)
1580                         continue;
1581
1582                 cur_weight += tgt->ltd_qos.ltq_weight;
1583                 if (cur_weight < rand)
1584                         continue;
1585
1586                 ltd_qos_update(&lmv->lmv_mdt_descs, tgt, &total_weight);
1587                 GOTO(unlock, tgt);
1588         }
1589
1590         /* no proper target found */
1591         GOTO(unlock, tgt = ERR_PTR(-EAGAIN));
1592 unlock:
1593         up_write(&lmv->lmv_qos.lq_rw_sem);
1594
1595         return tgt;
1596 }
1597
1598 static struct lu_tgt_desc *lmv_locate_tgt_rr(struct lmv_obd *lmv)
1599 {
1600         struct lu_tgt_desc *tgt;
1601         int i;
1602         int index;
1603
1604         ENTRY;
1605
1606         spin_lock(&lmv->lmv_lock);
1607         for (i = 0; i < lmv->lmv_mdt_descs.ltd_tgts_size; i++) {
1608                 index = (i + lmv->lmv_qos_rr_index) %
1609                         lmv->lmv_mdt_descs.ltd_tgts_size;
1610                 tgt = lmv_tgt(lmv, index);
1611                 if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
1612                         continue;
1613
1614                 lmv->lmv_qos_rr_index = (tgt->ltd_index + 1) %
1615                                         lmv->lmv_mdt_descs.ltd_tgts_size;
1616                 spin_unlock(&lmv->lmv_lock);
1617
1618                 RETURN(tgt);
1619         }
1620         spin_unlock(&lmv->lmv_lock);
1621
1622         RETURN(ERR_PTR(-ENODEV));
1623 }
1624
1625 /* locate MDT which is less full (avoid the most full MDT) */
1626 static struct lu_tgt_desc *lmv_locate_tgt_lf(struct lmv_obd *lmv)
1627 {
1628         struct lu_tgt_desc *min = NULL;
1629         struct lu_tgt_desc *tgt;
1630         __u64 avail = 0;
1631         __u64 rand;
1632
1633         ENTRY;
1634
1635         if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs))
1636                 RETURN(ERR_PTR(-EAGAIN));
1637
1638         down_write(&lmv->lmv_qos.lq_rw_sem);
1639
1640         if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs))
1641                 GOTO(unlock, tgt = ERR_PTR(-EAGAIN));
1642
1643         lmv_foreach_tgt(lmv, tgt) {
1644                 if (!tgt->ltd_exp || !tgt->ltd_active) {
1645                         tgt->ltd_qos.ltq_usable = 0;
1646                         continue;
1647                 }
1648
1649                 tgt->ltd_qos.ltq_usable = 1;
1650                 lu_tgt_qos_weight_calc(tgt, true);
1651                 avail += tgt->ltd_qos.ltq_avail;
1652                 if (!min || min->ltd_qos.ltq_avail > tgt->ltd_qos.ltq_avail)
1653                         min = tgt;
1654         }
1655
1656         /* avoid the most full MDT */
1657         if (min)
1658                 avail -= min->ltd_qos.ltq_avail;
1659
1660         rand = lu_prandom_u64_max(avail);
1661         avail = 0;
1662         lmv_foreach_connected_tgt(lmv, tgt) {
1663                 if (!tgt->ltd_qos.ltq_usable)
1664                         continue;
1665
1666                 if (tgt == min)
1667                         continue;
1668
1669                 avail += tgt->ltd_qos.ltq_avail;
1670                 if (avail < rand)
1671                         continue;
1672
1673                 GOTO(unlock, tgt);
1674         }
1675
1676         /* no proper target found */
1677         GOTO(unlock, tgt = ERR_PTR(-EAGAIN));
1678 unlock:
1679         up_write(&lmv->lmv_qos.lq_rw_sem);
1680
1681         RETURN(tgt);
1682 }
1683
1684 /* locate MDT by file name, for striped directory, the file name hash decides
1685  * which stripe its dirent is stored.
1686  */
1687 static struct lmv_tgt_desc *
1688 lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm,
1689                        const char *name, int namelen, struct lu_fid *fid,
1690                        __u32 *mds, bool new_layout)
1691 {
1692         struct lmv_tgt_desc *tgt;
1693         const struct lmv_oinfo *oinfo;
1694
1695         if (!lmv_dir_striped(lsm) || !namelen) {
1696                 tgt = lmv_fid2tgt(lmv, fid);
1697                 if (IS_ERR(tgt))
1698                         return tgt;
1699
1700                 *mds = tgt->ltd_index;
1701                 return tgt;
1702         }
1703
1704         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NAME_HASH)) {
1705                 if (cfs_fail_val >= lsm->lsm_md_stripe_count)
1706                         return ERR_PTR(-EBADF);
1707                 oinfo = &lsm->lsm_md_oinfo[cfs_fail_val];
1708         } else {
1709                 oinfo = lsm_name_to_stripe_info(lsm, name, namelen, new_layout);
1710                 if (IS_ERR(oinfo))
1711                         return ERR_CAST(oinfo);
1712         }
1713
1714         /* check stripe FID is sane */
1715         if (!fid_is_sane(&oinfo->lmo_fid))
1716                 return ERR_PTR(-ENODEV);
1717
1718         *fid = oinfo->lmo_fid;
1719         *mds = oinfo->lmo_mds;
1720         tgt = lmv_tgt(lmv, oinfo->lmo_mds);
1721
1722         CDEBUG(D_INODE, "locate MDT %u parent "DFID"\n", *mds, PFID(fid));
1723
1724         return tgt ? tgt : ERR_PTR(-ENODEV);
1725 }
1726
1727 /**
1728  * Locate MDT of op_data->op_fid1
1729  *
1730  * For striped directory, it will locate the stripe by name hash, if hash_type
1731  * is unknown, it will return the stripe specified by 'op_data->op_stripe_index'
1732  * which is set outside, and if dir is migrating, 'op_data->op_new_layout'
1733  * indicates whether old or new layout is used to locate.
1734  *
1735  * For plain direcotry, it just locate the MDT of op_data->op_fid1.
1736  *
1737  * \param[in] lmv               LMV device
1738  * \param[in/out] op_data       client MD stack parameters, name, namelen etc,
1739  *                              op_mds and op_fid1 will be updated if op_mea1
1740  *                              indicates fid1 represents a striped directory.
1741  *
1742  * retval               pointer to the lmv_tgt_desc if succeed.
1743  *                      ERR_PTR(errno) if failed.
1744  */
1745 struct lmv_tgt_desc *
1746 lmv_locate_tgt(struct lmv_obd *lmv, struct md_op_data *op_data)
1747 {
1748         struct lmv_stripe_md *lsm = op_data->op_mea1;
1749         struct lmv_oinfo *oinfo;
1750         struct lmv_tgt_desc *tgt;
1751
1752         if (lmv_dir_foreign(lsm))
1753                 return ERR_PTR(-ENODATA);
1754
1755         /* During creating VOLATILE file, it should honor the mdt
1756          * index if the file under striped dir is being restored, see
1757          * ct_restore(). */
1758         if (op_data->op_bias & MDS_CREATE_VOLATILE &&
1759             op_data->op_mds != LMV_OFFSET_DEFAULT) {
1760                 tgt = lmv_tgt(lmv, op_data->op_mds);
1761                 if (!tgt)
1762                         return ERR_PTR(-ENODEV);
1763
1764                 if (lmv_dir_striped(lsm)) {
1765                         int i;
1766
1767                         /* refill the right parent fid */
1768                         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
1769                                 oinfo = &lsm->lsm_md_oinfo[i];
1770                                 if (oinfo->lmo_mds == op_data->op_mds) {
1771                                         op_data->op_fid1 = oinfo->lmo_fid;
1772                                         break;
1773                                 }
1774                         }
1775
1776                         if (i == lsm->lsm_md_stripe_count)
1777                                 op_data->op_fid1 = lsm->lsm_md_oinfo[0].lmo_fid;
1778                 }
1779         } else if (lmv_dir_bad_hash(lsm)) {
1780                 LASSERT(op_data->op_stripe_index < lsm->lsm_md_stripe_count);
1781                 oinfo = &lsm->lsm_md_oinfo[op_data->op_stripe_index];
1782
1783                 op_data->op_fid1 = oinfo->lmo_fid;
1784                 op_data->op_mds = oinfo->lmo_mds;
1785                 tgt = lmv_tgt(lmv, oinfo->lmo_mds);
1786                 if (!tgt)
1787                         return ERR_PTR(-ENODEV);
1788         } else {
1789                 tgt = lmv_locate_tgt_by_name(lmv, op_data->op_mea1,
1790                                 op_data->op_name, op_data->op_namelen,
1791                                 &op_data->op_fid1, &op_data->op_mds,
1792                                 op_data->op_new_layout);
1793         }
1794
1795         return tgt;
1796 }
1797
1798 /* Locate MDT of op_data->op_fid2 for link/rename */
1799 static struct lmv_tgt_desc *
1800 lmv_locate_tgt2(struct lmv_obd *lmv, struct md_op_data *op_data)
1801 {
1802         struct lmv_tgt_desc *tgt;
1803         int rc;
1804
1805         LASSERT(op_data->op_name);
1806         if (lmv_dir_layout_changing(op_data->op_mea2)) {
1807                 struct lu_fid fid1 = op_data->op_fid1;
1808                 struct lmv_stripe_md *lsm1 = op_data->op_mea1;
1809                 struct ptlrpc_request *request = NULL;
1810
1811                 /*
1812                  * avoid creating new file under old layout of migrating
1813                  * directory, check it here.
1814                  */
1815                 tgt = lmv_locate_tgt_by_name(lmv, op_data->op_mea2,
1816                                 op_data->op_name, op_data->op_namelen,
1817                                 &op_data->op_fid2, &op_data->op_mds, false);
1818                 if (IS_ERR(tgt))
1819                         RETURN(tgt);
1820
1821                 op_data->op_fid1 = op_data->op_fid2;
1822                 op_data->op_mea1 = op_data->op_mea2;
1823                 rc = md_getattr_name(tgt->ltd_exp, op_data, &request);
1824                 op_data->op_fid1 = fid1;
1825                 op_data->op_mea1 = lsm1;
1826                 if (!rc) {
1827                         ptlrpc_req_finished(request);
1828                         RETURN(ERR_PTR(-EEXIST));
1829                 }
1830
1831                 if (rc != -ENOENT)
1832                         RETURN(ERR_PTR(rc));
1833         }
1834
1835         return lmv_locate_tgt_by_name(lmv, op_data->op_mea2, op_data->op_name,
1836                                 op_data->op_namelen, &op_data->op_fid2,
1837                                 &op_data->op_mds, true);
1838 }
1839
1840 int lmv_old_layout_lookup(struct lmv_obd *lmv, struct md_op_data *op_data)
1841 {
1842         struct lu_tgt_desc *tgt;
1843         struct ptlrpc_request *request;
1844         int rc;
1845
1846         LASSERT(lmv_dir_layout_changing(op_data->op_mea1));
1847         LASSERT(!op_data->op_new_layout);
1848
1849         tgt = lmv_locate_tgt(lmv, op_data);
1850         if (IS_ERR(tgt))
1851                 return PTR_ERR(tgt);
1852
1853         rc = md_getattr_name(tgt->ltd_exp, op_data, &request);
1854         if (!rc) {
1855                 ptlrpc_req_finished(request);
1856                 return -EEXIST;
1857         }
1858
1859         return rc;
1860 }
1861
1862 /* mkdir by QoS upon 'lfs mkdir -i -1'.
1863  *
1864  * NB, mkdir by QoS only if parent is not striped, this is to avoid remote
1865  * directories under striped directory.
1866  */
1867 static inline bool lmv_op_user_qos_mkdir(const struct md_op_data *op_data)
1868 {
1869         const struct lmv_user_md *lum = op_data->op_data;
1870
1871         if (op_data->op_code != LUSTRE_OPC_MKDIR)
1872                 return false;
1873
1874         if (lmv_dir_striped(op_data->op_mea1))
1875                 return false;
1876
1877         return (op_data->op_cli_flags & CLI_SET_MEA) && lum &&
1878                le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC &&
1879                le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT;
1880 }
1881
1882 /* mkdir by QoS if either ROOT or parent default LMV is space balanced. */
1883 static inline bool lmv_op_default_qos_mkdir(const struct md_op_data *op_data)
1884 {
1885         const struct lmv_stripe_md *lsm = op_data->op_default_mea1;
1886
1887         if (op_data->op_code != LUSTRE_OPC_MKDIR)
1888                 return false;
1889
1890         if (lmv_dir_striped(op_data->op_mea1))
1891                 return false;
1892
1893         return (op_data->op_flags & MF_QOS_MKDIR) ||
1894                (lsm && lsm->lsm_md_master_mdt_index == LMV_OFFSET_DEFAULT);
1895 }
1896
1897 /* if parent default LMV is space balanced, and
1898  * 1. max_inherit_rr is set
1899  * 2. or parent is ROOT
1900  * mkdir roundrobin. Or if parent doesn't have default LMV, while ROOT default
1901  * LMV requests roundrobin mkdir, do the same.
1902  * NB, this needs to check server is balanced, which is done by caller.
1903  */
1904 static inline bool lmv_op_default_rr_mkdir(const struct md_op_data *op_data)
1905 {
1906         const struct lmv_stripe_md *lsm = op_data->op_default_mea1;
1907
1908         return (op_data->op_flags & MF_RR_MKDIR) ||
1909                (lsm && lsm->lsm_md_max_inherit_rr != LMV_INHERIT_RR_NONE) ||
1910                fid_is_root(&op_data->op_fid1);
1911 }
1912
1913 /* 'lfs mkdir -i <specific_MDT>' */
1914 static inline bool lmv_op_user_specific_mkdir(const struct md_op_data *op_data)
1915 {
1916         const struct lmv_user_md *lum = op_data->op_data;
1917
1918         return op_data->op_code == LUSTRE_OPC_MKDIR &&
1919                op_data->op_cli_flags & CLI_SET_MEA && lum &&
1920                (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC ||
1921                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) &&
1922                le32_to_cpu(lum->lum_stripe_offset) != LMV_OFFSET_DEFAULT;
1923 }
1924
1925 /* parent default LMV master_mdt_index is not -1. */
1926 static inline bool
1927 lmv_op_default_specific_mkdir(const struct md_op_data *op_data)
1928 {
1929         return op_data->op_code == LUSTRE_OPC_MKDIR &&
1930                op_data->op_default_mea1 &&
1931                op_data->op_default_mea1->lsm_md_master_mdt_index !=
1932                         LMV_OFFSET_DEFAULT;
1933 }
1934
1935 /* locate MDT by space usage */
1936 static struct lu_tgt_desc *lmv_locate_tgt_by_space(struct lmv_obd *lmv,
1937                                                    struct md_op_data *op_data,
1938                                                    struct lmv_tgt_desc *tgt)
1939 {
1940         struct lmv_tgt_desc *tmp = tgt;
1941
1942         tgt = lmv_locate_tgt_qos(lmv, op_data);
1943         if (tgt == ERR_PTR(-EAGAIN)) {
1944                 if (ltd_qos_is_balanced(&lmv->lmv_mdt_descs) &&
1945                     !lmv_op_default_rr_mkdir(op_data) &&
1946                     !lmv_op_user_qos_mkdir(op_data))
1947                         /* if not necessary, don't create remote directory. */
1948                         tgt = tmp;
1949                 else
1950                         tgt = lmv_locate_tgt_rr(lmv);
1951         }
1952
1953         /*
1954          * only update statfs after QoS mkdir, this means the cached statfs may
1955          * be stale, and current mkdir may not follow QoS accurately, but it's
1956          * not serious, and avoids periodic statfs when client doesn't mkdir by
1957          * QoS.
1958          */
1959         if (!IS_ERR(tgt)) {
1960                 op_data->op_mds = tgt->ltd_index;
1961                 lmv_statfs_check_update(lmv2obd_dev(lmv), tgt);
1962         }
1963
1964         return tgt;
1965 }
1966
1967 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1968                 const void *data, size_t datalen, umode_t mode, uid_t uid,
1969                 gid_t gid, kernel_cap_t cap_effective, __u64 rdev,
1970                 struct ptlrpc_request **request)
1971 {
1972         struct obd_device *obd = exp->exp_obd;
1973         struct lmv_obd *lmv = &obd->u.lmv;
1974         struct lmv_tgt_desc *tgt;
1975         struct mdt_body *repbody;
1976         int rc;
1977
1978         ENTRY;
1979
1980         if (!lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count)
1981                 RETURN(-EIO);
1982
1983         if (lmv_dir_bad_hash(op_data->op_mea1))
1984                 RETURN(-EBADF);
1985
1986         if (lmv_dir_layout_changing(op_data->op_mea1)) {
1987                 /*
1988                  * if parent is migrating, create() needs to lookup existing
1989                  * name in both old and new layout, check old layout on client.
1990                  */
1991                 rc = lmv_old_layout_lookup(lmv, op_data);
1992                 if (rc != -ENOENT)
1993                         RETURN(rc);
1994
1995                 op_data->op_new_layout = true;
1996         }
1997
1998         tgt = lmv_locate_tgt(lmv, op_data);
1999         if (IS_ERR(tgt))
2000                 RETURN(PTR_ERR(tgt));
2001
2002         /* the order to apply policy in mkdir:
2003          * 1. is "lfs mkdir -i N"? mkdir on MDT N.
2004          * 2. is "lfs mkdir -i -1"? mkdir by space usage.
2005          * 3. is starting MDT specified in default LMV? mkdir on MDT N.
2006          * 4. is default LMV space balanced? mkdir by space usage.
2007          */
2008         if (lmv_op_user_specific_mkdir(op_data)) {
2009                 struct lmv_user_md *lum = op_data->op_data;
2010
2011                 op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset);
2012                 tgt = lmv_tgt(lmv, op_data->op_mds);
2013                 if (!tgt)
2014                         RETURN(-ENODEV);
2015         } else if (lmv_op_user_qos_mkdir(op_data)) {
2016                 tgt = lmv_locate_tgt_by_space(lmv, op_data, tgt);
2017                 if (IS_ERR(tgt))
2018                         RETURN(PTR_ERR(tgt));
2019         } else if (lmv_op_default_specific_mkdir(op_data)) {
2020                 op_data->op_mds =
2021                         op_data->op_default_mea1->lsm_md_master_mdt_index;
2022                 tgt = lmv_tgt(lmv, op_data->op_mds);
2023                 if (!tgt)
2024                         RETURN(-ENODEV);
2025         } else if (lmv_op_default_qos_mkdir(op_data)) {
2026                 tgt = lmv_locate_tgt_by_space(lmv, op_data, tgt);
2027                 if (IS_ERR(tgt))
2028                         RETURN(PTR_ERR(tgt));
2029         }
2030
2031 retry:
2032         rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
2033         if (rc)
2034                 RETURN(rc);
2035
2036         CDEBUG(D_INODE, "CREATE name '%.*s' "DFID" on "DFID" -> mds #%x\n",
2037                 (int)op_data->op_namelen, op_data->op_name,
2038                 PFID(&op_data->op_fid2), PFID(&op_data->op_fid1),
2039                 op_data->op_mds);
2040
2041         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2042         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
2043                        cap_effective, rdev, request);
2044         if (rc == 0) {
2045                 if (*request == NULL)
2046                         RETURN(rc);
2047                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
2048         }
2049
2050         /* dir restripe needs to send to MDT where dir is located */
2051         if (rc != -EREMOTE ||
2052             !(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH))
2053                 RETURN(rc);
2054
2055         repbody = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2056         if (repbody == NULL)
2057                 RETURN(-EPROTO);
2058
2059         /* Not cross-ref case, just get out of here. */
2060         if (likely(!(repbody->mbo_valid & OBD_MD_MDS)))
2061                 RETURN(rc);
2062
2063         op_data->op_fid2 = repbody->mbo_fid1;
2064         ptlrpc_req_finished(*request);
2065         *request = NULL;
2066
2067         tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
2068         if (IS_ERR(tgt))
2069                 RETURN(PTR_ERR(tgt));
2070
2071         op_data->op_mds = tgt->ltd_index;
2072         goto retry;
2073 }
2074
2075 static int
2076 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
2077             const union ldlm_policy_data *policy, struct md_op_data *op_data,
2078             struct lustre_handle *lockh, __u64 extra_lock_flags)
2079 {
2080         struct obd_device *obd = exp->exp_obd;
2081         struct lmv_obd *lmv = &obd->u.lmv;
2082         struct lmv_tgt_desc *tgt;
2083         int rc;
2084
2085         ENTRY;
2086
2087         CDEBUG(D_INODE, "ENQUEUE on "DFID"\n", PFID(&op_data->op_fid1));
2088
2089         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
2090         if (IS_ERR(tgt))
2091                 RETURN(PTR_ERR(tgt));
2092
2093         CDEBUG(D_INODE, "ENQUEUE on "DFID" -> mds #%u\n",
2094                PFID(&op_data->op_fid1), tgt->ltd_index);
2095
2096         rc = md_enqueue(tgt->ltd_exp, einfo, policy, op_data, lockh,
2097                         extra_lock_flags);
2098
2099         RETURN(rc);
2100 }
2101
2102 int
2103 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
2104                  struct ptlrpc_request **preq)
2105 {
2106         struct obd_device *obd = exp->exp_obd;
2107         struct lmv_obd *lmv = &obd->u.lmv;
2108         struct lmv_tgt_desc *tgt;
2109         struct mdt_body *body;
2110         int rc;
2111
2112         ENTRY;
2113
2114 retry:
2115         if (op_data->op_namelen == 2 &&
2116             op_data->op_name[0] == '.' && op_data->op_name[1] == '.')
2117                 tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
2118         else
2119                 tgt = lmv_locate_tgt(lmv, op_data);
2120         if (IS_ERR(tgt))
2121                 RETURN(PTR_ERR(tgt));
2122
2123         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
2124                 (int)op_data->op_namelen, op_data->op_name,
2125                 PFID(&op_data->op_fid1), tgt->ltd_index);
2126
2127         rc = md_getattr_name(tgt->ltd_exp, op_data, preq);
2128         if (rc == -ENOENT && lmv_dir_retry_check_update(op_data)) {
2129                 ptlrpc_req_finished(*preq);
2130                 *preq = NULL;
2131                 goto retry;
2132         }
2133
2134         if (rc)
2135                 RETURN(rc);
2136
2137         body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY);
2138         LASSERT(body != NULL);
2139
2140         if (body->mbo_valid & OBD_MD_MDS) {
2141                 op_data->op_fid1 = body->mbo_fid1;
2142                 op_data->op_valid |= OBD_MD_FLCROSSREF;
2143                 op_data->op_namelen = 0;
2144                 op_data->op_name = NULL;
2145
2146                 ptlrpc_req_finished(*preq);
2147                 *preq = NULL;
2148
2149                 goto retry;
2150         }
2151
2152         RETURN(rc);
2153 }
2154
2155 #define md_op_data_fid(op_data, fl)                     \
2156         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
2157          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
2158          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
2159          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
2160          NULL)
2161
2162 static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt,
2163                             struct md_op_data *op_data, __u32 op_tgt,
2164                             enum ldlm_mode mode, int bits, int flag)
2165 {
2166         struct lu_fid *fid = md_op_data_fid(op_data, flag);
2167         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2168         union ldlm_policy_data policy = { { 0 } };
2169         int rc = 0;
2170         ENTRY;
2171
2172         if (!fid_is_sane(fid))
2173                 RETURN(0);
2174
2175         if (tgt == NULL) {
2176                 tgt = lmv_fid2tgt(lmv, fid);
2177                 if (IS_ERR(tgt))
2178                         RETURN(PTR_ERR(tgt));
2179         }
2180
2181         if (tgt->ltd_index != op_tgt) {
2182                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
2183                 policy.l_inodebits.bits = bits;
2184                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
2185                                       mode, LCF_ASYNC, NULL);
2186         } else {
2187                 CDEBUG(D_INODE,
2188                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
2189                        op_tgt, PFID(fid));
2190                 op_data->op_flags |= flag;
2191                 rc = 0;
2192         }
2193
2194         RETURN(rc);
2195 }
2196
2197 /*
2198  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
2199  * op_data->op_fid2
2200  */
2201 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
2202                     struct ptlrpc_request **request)
2203 {
2204         struct obd_device       *obd = exp->exp_obd;
2205         struct lmv_obd          *lmv = &obd->u.lmv;
2206         struct lmv_tgt_desc     *tgt;
2207         int                      rc;
2208         ENTRY;
2209
2210         LASSERT(op_data->op_namelen != 0);
2211
2212         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
2213                PFID(&op_data->op_fid2), (int)op_data->op_namelen,
2214                op_data->op_name, PFID(&op_data->op_fid1));
2215
2216         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2217         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2218         op_data->op_cap = current_cap();
2219
2220         tgt = lmv_locate_tgt2(lmv, op_data);
2221         if (IS_ERR(tgt))
2222                 RETURN(PTR_ERR(tgt));
2223
2224         /*
2225          * Cancel UPDATE lock on child (fid1).
2226          */
2227         op_data->op_flags |= MF_MDC_CANCEL_FID2;
2228         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_index, LCK_EX,
2229                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
2230         if (rc != 0)
2231                 RETURN(rc);
2232
2233         rc = md_link(tgt->ltd_exp, op_data, request);
2234
2235         RETURN(rc);
2236 }
2237
2238 /* migrate the top directory */
2239 static inline bool lmv_op_topdir_migrate(const struct md_op_data *op_data)
2240 {
2241         if (!S_ISDIR(op_data->op_mode))
2242                 return false;
2243
2244         if (lmv_dir_layout_changing(op_data->op_mea1))
2245                 return false;
2246
2247         return true;
2248 }
2249
2250 /* migrate top dir to specific MDTs */
2251 static inline bool lmv_topdir_specific_migrate(const struct md_op_data *op_data)
2252 {
2253         const struct lmv_user_md *lum = op_data->op_data;
2254
2255         if (!lmv_op_topdir_migrate(op_data))
2256                 return false;
2257
2258         return le32_to_cpu(lum->lum_stripe_offset) != LMV_OFFSET_DEFAULT;
2259 }
2260
2261 /* migrate top dir in QoS mode if user issued "lfs migrate -m -1..." */
2262 static inline bool lmv_topdir_qos_migrate(const struct md_op_data *op_data)
2263 {
2264         const struct lmv_user_md *lum = op_data->op_data;
2265
2266         if (!lmv_op_topdir_migrate(op_data))
2267                 return false;
2268
2269         return le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT;
2270 }
2271
2272 static inline bool lmv_subdir_specific_migrate(const struct md_op_data *op_data)
2273 {
2274         const struct lmv_user_md *lum = op_data->op_data;
2275
2276         if (!S_ISDIR(op_data->op_mode))
2277                 return false;
2278
2279         if (!lmv_dir_layout_changing(op_data->op_mea1))
2280                 return false;
2281
2282         return le32_to_cpu(lum->lum_stripe_offset) != LMV_OFFSET_DEFAULT;
2283 }
2284
2285 static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data,
2286                         const char *name, size_t namelen,
2287                         struct ptlrpc_request **request)
2288 {
2289         struct obd_device *obd = exp->exp_obd;
2290         struct lmv_obd *lmv = &obd->u.lmv;
2291         struct lmv_stripe_md *lsm = op_data->op_mea1;
2292         struct lmv_tgt_desc *parent_tgt;
2293         struct lmv_tgt_desc *sp_tgt;
2294         struct lmv_tgt_desc *tp_tgt = NULL;
2295         struct lmv_tgt_desc *child_tgt;
2296         struct lmv_tgt_desc *tgt;
2297         struct lu_fid target_fid = { 0 };
2298         int rc;
2299
2300         ENTRY;
2301
2302         LASSERT(op_data->op_cli_flags & CLI_MIGRATE);
2303
2304         CDEBUG(D_INODE, "MIGRATE "DFID"/%.*s\n",
2305                PFID(&op_data->op_fid1), (int)namelen, name);
2306
2307         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2308         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2309         op_data->op_cap = current_cap();
2310
2311         parent_tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
2312         if (IS_ERR(parent_tgt))
2313                 RETURN(PTR_ERR(parent_tgt));
2314
2315         if (lmv_dir_striped(lsm)) {
2316                 const struct lmv_oinfo *oinfo;
2317
2318                 oinfo = lsm_name_to_stripe_info(lsm, name, namelen, false);
2319                 if (IS_ERR(oinfo))
2320                         RETURN(PTR_ERR(oinfo));
2321
2322                 /* save source stripe FID in fid4 temporarily for ELC */
2323                 op_data->op_fid4 = oinfo->lmo_fid;
2324                 sp_tgt = lmv_tgt(lmv, oinfo->lmo_mds);
2325                 if (!sp_tgt)
2326                         RETURN(-ENODEV);
2327
2328                 /*
2329                  * if parent is being migrated too, fill op_fid2 with target
2330                  * stripe fid, otherwise the target stripe is not created yet.
2331                  */
2332                 if (lmv_dir_layout_changing(lsm)) {
2333                         oinfo = lsm_name_to_stripe_info(lsm, name, namelen,
2334                                                         true);
2335                         if (IS_ERR(oinfo))
2336                                 RETURN(PTR_ERR(oinfo));
2337
2338                         op_data->op_fid2 = oinfo->lmo_fid;
2339                         tp_tgt = lmv_tgt(lmv, oinfo->lmo_mds);
2340                         if (!tp_tgt)
2341                                 RETURN(-ENODEV);
2342
2343                         /* parent unchanged and update namespace only */
2344                         if (lu_fid_eq(&op_data->op_fid4, &op_data->op_fid2) &&
2345                             op_data->op_bias & MDS_MIGRATE_NSONLY)
2346                                 RETURN(-EALREADY);
2347                 }
2348         } else {
2349                 sp_tgt = parent_tgt;
2350         }
2351
2352         child_tgt = lmv_fid2tgt(lmv, &op_data->op_fid3);
2353         if (IS_ERR(child_tgt))
2354                 RETURN(PTR_ERR(child_tgt));
2355
2356         if (lmv_topdir_specific_migrate(op_data)) {
2357                 struct lmv_user_md *lum = op_data->op_data;
2358
2359                 op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset);
2360         } else if (lmv_topdir_qos_migrate(op_data)) {
2361                 tgt = lmv_locate_tgt_lf(lmv);
2362                 if (tgt == ERR_PTR(-EAGAIN))
2363                         tgt = lmv_locate_tgt_rr(lmv);
2364                 if (IS_ERR(tgt))
2365                         RETURN(PTR_ERR(tgt));
2366
2367                 op_data->op_mds = tgt->ltd_index;
2368         } else if (lmv_subdir_specific_migrate(op_data)) {
2369                 struct lmv_user_md *lum = op_data->op_data;
2370                 __u32 i;
2371
2372                 LASSERT(tp_tgt);
2373                 if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) {
2374                         /* adjust MDTs in lum, since subdir is located on where
2375                          * its parent stripe is, not the first specified MDT.
2376                          */
2377                         for (i = 0; i < le32_to_cpu(lum->lum_stripe_count);
2378                              i++) {
2379                                 if (le32_to_cpu(lum->lum_objects[i].lum_mds) ==
2380                                     tp_tgt->ltd_index)
2381                                         break;
2382                         }
2383
2384                         if (i == le32_to_cpu(lum->lum_stripe_count))
2385                                 RETURN(-ENODEV);
2386
2387                         lum->lum_objects[i].lum_mds =
2388                                 lum->lum_objects[0].lum_mds;
2389                         lum->lum_objects[0].lum_mds =
2390                                 cpu_to_le32(tp_tgt->ltd_index);
2391                 }
2392                 /* NB, the above adjusts subdir migration for command like
2393                  * "lfs migrate -m 0,1,2 ...", but for migration like
2394                  * "lfs migrate -m 0 -c 2 ...", the top dir is migrated to MDT0
2395                  * and MDT1, however its subdir may be migrated to MDT1 and MDT2
2396                  */
2397
2398                 lum->lum_stripe_offset = cpu_to_le32(tp_tgt->ltd_index);
2399                 op_data->op_mds = tp_tgt->ltd_index;
2400         } else if (tp_tgt) {
2401                 op_data->op_mds = tp_tgt->ltd_index;
2402         } else {
2403                 op_data->op_mds = sp_tgt->ltd_index;
2404         }
2405
2406         rc = lmv_fid_alloc(NULL, exp, &target_fid, op_data);
2407         if (rc)
2408                 RETURN(rc);
2409
2410         /*
2411          * for directory, send migrate request to the MDT where the object will
2412          * be migrated to, because we can't create a striped directory remotely.
2413          *
2414          * otherwise, send to the MDT where source is located because regular
2415          * file may open lease.
2416          *
2417          * NB. if MDT doesn't support DIR_MIGRATE, send to source MDT too for
2418          * backward compatibility.
2419          */
2420         if (S_ISDIR(op_data->op_mode) &&
2421             (exp_connect_flags2(exp) & OBD_CONNECT2_DIR_MIGRATE)) {
2422                 tgt = lmv_fid2tgt(lmv, &target_fid);
2423                 if (IS_ERR(tgt))
2424                         RETURN(PTR_ERR(tgt));
2425         } else {
2426                 tgt = child_tgt;
2427         }
2428
2429         /* cancel UPDATE lock of parent master object */
2430         rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_index, LCK_EX,
2431                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
2432         if (rc)
2433                 RETURN(rc);
2434
2435         /* cancel UPDATE lock of source parent */
2436         if (sp_tgt != parent_tgt) {
2437                 /*
2438                  * migrate RPC packs master object FID, because we can only pack
2439                  * two FIDs in reint RPC, but MDS needs to know both source
2440                  * parent and target parent, and it will obtain them from master
2441                  * FID and LMV, the other FID in RPC is kept for target.
2442                  *
2443                  * since this FID is not passed to MDC, cancel it anyway.
2444                  */
2445                 rc = lmv_early_cancel(exp, sp_tgt, op_data, -1, LCK_EX,
2446                                       MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID4);
2447                 if (rc)
2448                         RETURN(rc);
2449
2450                 op_data->op_flags &= ~MF_MDC_CANCEL_FID4;
2451         }
2452         op_data->op_fid4 = target_fid;
2453
2454         /* cancel UPDATE locks of target parent */
2455         rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_index, LCK_EX,
2456                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
2457         if (rc)
2458                 RETURN(rc);
2459
2460         /* cancel LOOKUP lock of source if source is remote object */
2461         if (child_tgt != sp_tgt) {
2462                 rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_index,
2463                                       LCK_EX, MDS_INODELOCK_LOOKUP,
2464                                       MF_MDC_CANCEL_FID3);
2465                 if (rc)
2466                         RETURN(rc);
2467         }
2468
2469         /* cancel ELC locks of source */
2470         rc = lmv_early_cancel(exp, child_tgt, op_data, tgt->ltd_index, LCK_EX,
2471                               MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3);
2472         if (rc)
2473                 RETURN(rc);
2474
2475         rc = md_rename(tgt->ltd_exp, op_data, name, namelen, NULL, 0, request);
2476
2477         RETURN(rc);
2478 }
2479
2480 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
2481                       const char *old, size_t oldlen,
2482                       const char *new, size_t newlen,
2483                       struct ptlrpc_request **request)
2484 {
2485         struct obd_device *obd = exp->exp_obd;
2486         struct lmv_obd *lmv = &obd->u.lmv;
2487         struct lmv_tgt_desc *sp_tgt;
2488         struct lmv_tgt_desc *tp_tgt = NULL;
2489         struct lmv_tgt_desc *src_tgt = NULL;
2490         struct lmv_tgt_desc *tgt;
2491         struct mdt_body *body;
2492         int rc;
2493
2494         ENTRY;
2495
2496         LASSERT(oldlen != 0);
2497
2498         if (op_data->op_cli_flags & CLI_MIGRATE) {
2499                 rc = lmv_migrate(exp, op_data, old, oldlen, request);
2500                 RETURN(rc);
2501         }
2502
2503         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2504         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2505         op_data->op_cap = current_cap();
2506
2507         op_data->op_name = new;
2508         op_data->op_namelen = newlen;
2509
2510         tp_tgt = lmv_locate_tgt2(lmv, op_data);
2511         if (IS_ERR(tp_tgt))
2512                 RETURN(PTR_ERR(tp_tgt));
2513
2514         /* Since the target child might be destroyed, and it might become
2515          * orphan, and we can only check orphan on the local MDT right now, so
2516          * we send rename request to the MDT where target child is located. If
2517          * target child does not exist, then it will send the request to the
2518          * target parent */
2519         if (fid_is_sane(&op_data->op_fid4)) {
2520                 tgt = lmv_fid2tgt(lmv, &op_data->op_fid4);
2521                 if (IS_ERR(tgt))
2522                         RETURN(PTR_ERR(tgt));
2523         } else {
2524                 tgt = tp_tgt;
2525         }
2526
2527         op_data->op_flags |= MF_MDC_CANCEL_FID4;
2528
2529         /* cancel UPDATE locks of target parent */
2530         rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_index, LCK_EX,
2531                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
2532         if (rc != 0)
2533                 RETURN(rc);
2534
2535         if (fid_is_sane(&op_data->op_fid4)) {
2536                 /* cancel LOOKUP lock of target on target parent */
2537                 if (tgt != tp_tgt) {
2538                         rc = lmv_early_cancel(exp, tp_tgt, op_data,
2539                                               tgt->ltd_index, LCK_EX,
2540                                               MDS_INODELOCK_LOOKUP,
2541                                               MF_MDC_CANCEL_FID4);
2542                         if (rc != 0)
2543                                 RETURN(rc);
2544                 }
2545         }
2546
2547         if (fid_is_sane(&op_data->op_fid3)) {
2548                 src_tgt = lmv_fid2tgt(lmv, &op_data->op_fid3);
2549                 if (IS_ERR(src_tgt))
2550                         RETURN(PTR_ERR(src_tgt));
2551
2552                 /* cancel ELC locks of source */
2553                 rc = lmv_early_cancel(exp, src_tgt, op_data, tgt->ltd_index,
2554                                       LCK_EX, MDS_INODELOCK_ELC,
2555                                       MF_MDC_CANCEL_FID3);
2556                 if (rc != 0)
2557                         RETURN(rc);
2558         }
2559
2560         op_data->op_name = old;
2561         op_data->op_namelen = oldlen;
2562 retry:
2563         sp_tgt = lmv_locate_tgt(lmv, op_data);
2564         if (IS_ERR(sp_tgt))
2565                 RETURN(PTR_ERR(sp_tgt));
2566
2567         /* cancel UPDATE locks of source parent */
2568         rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_index, LCK_EX,
2569                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
2570         if (rc != 0)
2571                 RETURN(rc);
2572
2573         if (fid_is_sane(&op_data->op_fid3)) {
2574                 /* cancel LOOKUP lock of source on source parent */
2575                 if (src_tgt != sp_tgt) {
2576                         rc = lmv_early_cancel(exp, sp_tgt, op_data,
2577                                               tgt->ltd_index, LCK_EX,
2578                                               MDS_INODELOCK_LOOKUP,
2579                                               MF_MDC_CANCEL_FID3);
2580                         if (rc != 0)
2581                                 RETURN(rc);
2582                 }
2583         }
2584
2585 rename:
2586         CDEBUG(D_INODE, "RENAME "DFID"/%.*s to "DFID"/%.*s\n",
2587                 PFID(&op_data->op_fid1), (int)oldlen, old,
2588                 PFID(&op_data->op_fid2), (int)newlen, new);
2589
2590         rc = md_rename(tgt->ltd_exp, op_data, old, oldlen, new, newlen,
2591                         request);
2592         if (rc == -ENOENT && lmv_dir_retry_check_update(op_data)) {
2593                 ptlrpc_req_finished(*request);
2594                 *request = NULL;
2595                 goto retry;
2596         }
2597
2598         if (rc && rc != -EXDEV)
2599                 RETURN(rc);
2600
2601         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2602         if (body == NULL)
2603                 RETURN(-EPROTO);
2604
2605         /* Not cross-ref case, just get out of here. */
2606         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
2607                 RETURN(rc);
2608
2609         op_data->op_fid4 = body->mbo_fid1;
2610
2611         ptlrpc_req_finished(*request);
2612         *request = NULL;
2613
2614         tgt = lmv_fid2tgt(lmv, &op_data->op_fid4);
2615         if (IS_ERR(tgt))
2616                 RETURN(PTR_ERR(tgt));
2617
2618         if (fid_is_sane(&op_data->op_fid4)) {
2619                 /* cancel LOOKUP lock of target on target parent */
2620                 if (tgt != tp_tgt) {
2621                         rc = lmv_early_cancel(exp, tp_tgt, op_data,
2622                                               tgt->ltd_index, LCK_EX,
2623                                               MDS_INODELOCK_LOOKUP,
2624                                               MF_MDC_CANCEL_FID4);
2625                         if (rc != 0)
2626                                 RETURN(rc);
2627                 }
2628         }
2629
2630         goto rename;
2631 }
2632
2633 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2634                        void *ea, size_t ealen, struct ptlrpc_request **request)
2635 {
2636         struct obd_device *obd = exp->exp_obd;
2637         struct lmv_obd *lmv = &obd->u.lmv;
2638         struct lmv_tgt_desc *tgt;
2639         int rc = 0;
2640
2641         ENTRY;
2642
2643         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x/0x%x\n",
2644                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
2645                op_data->op_xvalid);
2646
2647         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2648         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
2649         if (IS_ERR(tgt))
2650                 RETURN(PTR_ERR(tgt));
2651
2652         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, request);
2653
2654         RETURN(rc);
2655 }
2656
2657 static int lmv_fsync(struct obd_export *exp, const struct lu_fid *fid,
2658                      struct ptlrpc_request **request)
2659 {
2660         struct obd_device *obd = exp->exp_obd;
2661         struct lmv_obd *lmv = &obd->u.lmv;
2662         struct lmv_tgt_desc *tgt;
2663         int rc;
2664
2665         ENTRY;
2666
2667         tgt = lmv_fid2tgt(lmv, fid);
2668         if (IS_ERR(tgt))
2669                 RETURN(PTR_ERR(tgt));
2670
2671         rc = md_fsync(tgt->ltd_exp, fid, request);
2672         RETURN(rc);
2673 }
2674
2675 struct stripe_dirent {
2676         struct page             *sd_page;
2677         struct lu_dirpage       *sd_dp;
2678         struct lu_dirent        *sd_ent;
2679         bool                     sd_eof;
2680 };
2681
2682 struct lmv_dir_ctxt {
2683         struct lmv_obd          *ldc_lmv;
2684         struct md_op_data       *ldc_op_data;
2685         struct md_readdir_info  *ldc_mrinfo;
2686         __u64                    ldc_hash;
2687         int                      ldc_count;
2688         struct stripe_dirent     ldc_stripes[0];
2689 };
2690
2691 static inline void stripe_dirent_unload(struct stripe_dirent *stripe)
2692 {
2693         if (stripe->sd_page) {
2694                 kunmap(stripe->sd_page);
2695                 put_page(stripe->sd_page);
2696                 stripe->sd_page = NULL;
2697                 stripe->sd_ent = NULL;
2698         }
2699 }
2700
2701 static inline void put_lmv_dir_ctxt(struct lmv_dir_ctxt *ctxt)
2702 {
2703         int i;
2704
2705         for (i = 0; i < ctxt->ldc_count; i++)
2706                 stripe_dirent_unload(&ctxt->ldc_stripes[i]);
2707 }
2708
2709 /* if @ent is dummy, or . .., get next */
2710 static struct lu_dirent *stripe_dirent_get(struct lmv_dir_ctxt *ctxt,
2711                                            struct lu_dirent *ent,
2712                                            int stripe_index)
2713 {
2714         for (; ent; ent = lu_dirent_next(ent)) {
2715                 /* Skip dummy entry */
2716                 if (le16_to_cpu(ent->lde_namelen) == 0)
2717                         continue;
2718
2719                 /* skip . and .. for other stripes */
2720                 if (stripe_index &&
2721                     (strncmp(ent->lde_name, ".",
2722                              le16_to_cpu(ent->lde_namelen)) == 0 ||
2723                      strncmp(ent->lde_name, "..",
2724                              le16_to_cpu(ent->lde_namelen)) == 0))
2725                         continue;
2726
2727                 if (le64_to_cpu(ent->lde_hash) >= ctxt->ldc_hash)
2728                         break;
2729         }
2730
2731         return ent;
2732 }
2733
2734 static struct lu_dirent *stripe_dirent_load(struct lmv_dir_ctxt *ctxt,
2735                                             struct stripe_dirent *stripe,
2736                                             int stripe_index)
2737 {
2738         struct md_op_data *op_data = ctxt->ldc_op_data;
2739         struct lmv_oinfo *oinfo;
2740         struct lu_fid fid = op_data->op_fid1;
2741         struct inode *inode = op_data->op_data;
2742         struct lmv_tgt_desc *tgt;
2743         struct lu_dirent *ent = stripe->sd_ent;
2744         __u64 hash = ctxt->ldc_hash;
2745         int rc = 0;
2746
2747         ENTRY;
2748
2749         LASSERT(stripe == &ctxt->ldc_stripes[stripe_index]);
2750         LASSERT(!ent);
2751
2752         do {
2753                 if (stripe->sd_page) {
2754                         __u64 end = le64_to_cpu(stripe->sd_dp->ldp_hash_end);
2755
2756                         /* @hash should be the last dirent hash */
2757                         LASSERTF(hash <= end,
2758                                  "ctxt@%p stripe@%p hash %llx end %llx\n",
2759                                  ctxt, stripe, hash, end);
2760                         /* unload last page */
2761                         stripe_dirent_unload(stripe);
2762                         /* eof */
2763                         if (end == MDS_DIR_END_OFF) {
2764                                 stripe->sd_eof = true;
2765                                 break;
2766                         }
2767                         hash = end;
2768                 }
2769
2770                 oinfo = &op_data->op_mea1->lsm_md_oinfo[stripe_index];
2771                 if (!oinfo->lmo_root) {
2772                         rc = -ENOENT;
2773                         break;
2774                 }
2775
2776                 tgt = lmv_tgt(ctxt->ldc_lmv, oinfo->lmo_mds);
2777                 if (!tgt) {
2778                         rc = -ENODEV;
2779                         break;
2780                 }
2781
2782                 /* op_data is shared by stripes, reset after use */
2783                 op_data->op_fid1 = oinfo->lmo_fid;
2784                 op_data->op_fid2 = oinfo->lmo_fid;
2785                 op_data->op_data = oinfo->lmo_root;
2786
2787                 rc = md_read_page(tgt->ltd_exp, op_data, ctxt->ldc_mrinfo, hash,
2788                                   &stripe->sd_page);
2789
2790                 op_data->op_fid1 = fid;
2791                 op_data->op_fid2 = fid;
2792                 op_data->op_data = inode;
2793
2794                 if (rc)
2795                         break;
2796
2797                 stripe->sd_dp = page_address(stripe->sd_page);
2798                 ent = stripe_dirent_get(ctxt, lu_dirent_start(stripe->sd_dp),
2799                                         stripe_index);
2800                 /* in case a page filled with ., .. and dummy, read next */
2801         } while (!ent);
2802
2803         stripe->sd_ent = ent;
2804         if (rc) {
2805                 LASSERT(!ent);
2806                 /* treat error as eof, so dir can be partially accessed */
2807                 stripe->sd_eof = true;
2808                 ctxt->ldc_mrinfo->mr_partial_readdir_rc = rc;
2809                 LCONSOLE_WARN("dir "DFID" stripe %d readdir failed: %d, "
2810                               "directory is partially accessed!\n",
2811                               PFID(&ctxt->ldc_op_data->op_fid1), stripe_index,
2812                               rc);
2813         }
2814
2815         RETURN(ent);
2816 }
2817
2818 static int lmv_file_resync(struct obd_export *exp, struct md_op_data *data)
2819 {
2820         struct obd_device *obd = exp->exp_obd;
2821         struct lmv_obd *lmv = &obd->u.lmv;
2822         struct lmv_tgt_desc *tgt;
2823         int rc;
2824
2825         ENTRY;
2826
2827         rc = lmv_check_connect(obd);
2828         if (rc != 0)
2829                 RETURN(rc);
2830
2831         tgt = lmv_fid2tgt(lmv, &data->op_fid1);
2832         if (IS_ERR(tgt))
2833                 RETURN(PTR_ERR(tgt));
2834
2835         data->op_flags |= MF_MDC_CANCEL_FID1;
2836         rc = md_file_resync(tgt->ltd_exp, data);
2837         RETURN(rc);
2838 }
2839
2840 /**
2841  * Get dirent with the closest hash for striped directory
2842  *
2843  * This function will search the dir entry, whose hash value is the
2844  * closest(>=) to hash from all of sub-stripes, and it is only being called
2845  * for striped directory.
2846  *
2847  * \param[in] ctxt              dir read context
2848  *
2849  * \retval                      dirent get the entry successfully
2850  *                              NULL does not get the entry, normally it means
2851  *                              it reaches the end of the directory, while read
2852  *                              stripe dirent error is ignored to allow partial
2853  *                              access.
2854  */
2855 static struct lu_dirent *lmv_dirent_next(struct lmv_dir_ctxt *ctxt)
2856 {
2857         struct stripe_dirent *stripe;
2858         struct lu_dirent *ent = NULL;
2859         int i;
2860         int min = -1;
2861
2862         /* TODO: optimize with k-way merge sort */
2863         for (i = 0; i < ctxt->ldc_count; i++) {
2864                 stripe = &ctxt->ldc_stripes[i];
2865                 if (stripe->sd_eof)
2866                         continue;
2867
2868                 if (!stripe->sd_ent) {
2869                         stripe_dirent_load(ctxt, stripe, i);
2870                         if (!stripe->sd_ent) {
2871                                 LASSERT(stripe->sd_eof);
2872                                 continue;
2873                         }
2874                 }
2875
2876                 if (min == -1 ||
2877                     le64_to_cpu(ctxt->ldc_stripes[min].sd_ent->lde_hash) >
2878                     le64_to_cpu(stripe->sd_ent->lde_hash)) {
2879                         min = i;
2880                         if (le64_to_cpu(stripe->sd_ent->lde_hash) ==
2881                             ctxt->ldc_hash)
2882                                 break;
2883                 }
2884         }
2885
2886         if (min != -1) {
2887                 stripe = &ctxt->ldc_stripes[min];
2888                 ent = stripe->sd_ent;
2889                 /* pop found dirent */
2890                 stripe->sd_ent = stripe_dirent_get(ctxt, lu_dirent_next(ent),
2891                                                    min);
2892         }
2893
2894         return ent;
2895 }
2896
2897 /**
2898  * Build dir entry page for striped directory
2899  *
2900  * This function gets one entry by @offset from a striped directory. It will
2901  * read entries from all of stripes, and choose one closest to the required
2902  * offset(&offset). A few notes
2903  * 1. skip . and .. for non-zero stripes, because there can only have one .
2904  * and .. in a directory.
2905  * 2. op_data will be shared by all of stripes, instead of allocating new
2906  * one, so need to restore before reusing.
2907  *
2908  * \param[in] exp       obd export refer to LMV
2909  * \param[in] op_data   hold those MD parameters of read_entry
2910  * \param[in] mrinfo    ldlm callback being used in enqueue in mdc_read_entry,
2911  *                      and partial readdir result will be stored in it.
2912  * \param[in] offset    starting hash offset
2913  * \param[out] ppage    the page holding the entry. Note: because the entry
2914  *                      will be accessed in upper layer, so we need hold the
2915  *                      page until the usages of entry is finished, see
2916  *                      ll_dir_entry_next.
2917  *
2918  * retval               =0 if get entry successfully
2919  *                      <0 cannot get entry
2920  */
2921 static int lmv_striped_read_page(struct obd_export *exp,
2922                                  struct md_op_data *op_data,
2923                                  struct md_readdir_info *mrinfo, __u64 offset,
2924                                  struct page **ppage)
2925 {
2926         struct page *page = NULL;
2927         struct lu_dirpage *dp;
2928         void *start;
2929         struct lu_dirent *ent;
2930         struct lu_dirent *last_ent;
2931         int stripe_count;
2932         struct lmv_dir_ctxt *ctxt;
2933         struct lu_dirent *next = NULL;
2934         __u16 ent_size;
2935         size_t left_bytes;
2936         int rc = 0;
2937         ENTRY;
2938
2939         /* Allocate a page and read entries from all of stripes and fill
2940          * the page by hash order */
2941         page = alloc_page(GFP_KERNEL);
2942         if (!page)
2943                 RETURN(-ENOMEM);
2944
2945         /* Initialize the entry page */
2946         dp = kmap(page);
2947         memset(dp, 0, sizeof(*dp));
2948         dp->ldp_hash_start = cpu_to_le64(offset);
2949
2950         start = dp + 1;
2951         left_bytes = PAGE_SIZE - sizeof(*dp);
2952         ent = start;
2953         last_ent = ent;
2954
2955         /* initalize dir read context */
2956         stripe_count = op_data->op_mea1->lsm_md_stripe_count;
2957         OBD_ALLOC(ctxt, offsetof(typeof(*ctxt), ldc_stripes[stripe_count]));
2958         if (!ctxt)
2959                 GOTO(free_page, rc = -ENOMEM);
2960         ctxt->ldc_lmv = &exp->exp_obd->u.lmv;
2961         ctxt->ldc_op_data = op_data;
2962         ctxt->ldc_mrinfo = mrinfo;
2963         ctxt->ldc_hash = offset;
2964         ctxt->ldc_count = stripe_count;
2965
2966         while (1) {
2967                 next = lmv_dirent_next(ctxt);
2968
2969                 /* end of directory */
2970                 if (!next) {
2971                         ctxt->ldc_hash = MDS_DIR_END_OFF;
2972                         break;
2973                 }
2974                 ctxt->ldc_hash = le64_to_cpu(next->lde_hash);
2975
2976                 ent_size = le16_to_cpu(next->lde_reclen);
2977
2978                 /* the last entry lde_reclen is 0, but it might not be the last
2979                  * one of this temporay dir page */
2980                 if (!ent_size)
2981                         ent_size = lu_dirent_calc_size(
2982                                         le16_to_cpu(next->lde_namelen),
2983                                         le32_to_cpu(next->lde_attrs));
2984                 /* page full */
2985                 if (ent_size > left_bytes)
2986                         break;
2987
2988                 memcpy(ent, next, ent_size);
2989
2990                 /* Replace . with master FID and Replace .. with the parent FID
2991                  * of master object */
2992                 if (strncmp(ent->lde_name, ".",
2993                             le16_to_cpu(ent->lde_namelen)) == 0 &&
2994                     le16_to_cpu(ent->lde_namelen) == 1)
2995                         fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid1);
2996                 else if (strncmp(ent->lde_name, "..",
2997                                    le16_to_cpu(ent->lde_namelen)) == 0 &&
2998                            le16_to_cpu(ent->lde_namelen) == 2)
2999                         fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
3000
3001                 CDEBUG(D_INODE, "entry %.*s hash %#llx\n",
3002                        le16_to_cpu(ent->lde_namelen), ent->lde_name,
3003                        le64_to_cpu(ent->lde_hash));
3004
3005                 left_bytes -= ent_size;
3006                 ent->lde_reclen = cpu_to_le16(ent_size);
3007                 last_ent = ent;
3008                 ent = (void *)ent + ent_size;
3009         };
3010
3011         last_ent->lde_reclen = 0;
3012
3013         if (ent == start)
3014                 dp->ldp_flags |= LDF_EMPTY;
3015         else if (ctxt->ldc_hash == le64_to_cpu(last_ent->lde_hash))
3016                 dp->ldp_flags |= LDF_COLLIDE;
3017         dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
3018         dp->ldp_hash_end = cpu_to_le64(ctxt->ldc_hash);
3019
3020         put_lmv_dir_ctxt(ctxt);
3021         OBD_FREE(ctxt, offsetof(typeof(*ctxt), ldc_stripes[stripe_count]));
3022
3023         *ppage = page;
3024
3025         RETURN(0);
3026
3027 free_page:
3028         kunmap(page);
3029         __free_page(page);
3030
3031         return rc;
3032 }
3033
3034 static int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
3035                          struct md_readdir_info *mrinfo, __u64 offset,
3036                          struct page **ppage)
3037 {
3038         struct obd_device *obd = exp->exp_obd;
3039         struct lmv_obd *lmv = &obd->u.lmv;
3040         struct lmv_tgt_desc *tgt;
3041         int rc;
3042
3043         ENTRY;
3044
3045         if (unlikely(lmv_dir_foreign(op_data->op_mea1)))
3046                 RETURN(-ENODATA);
3047
3048         if (unlikely(lmv_dir_striped(op_data->op_mea1))) {
3049                 rc = lmv_striped_read_page(exp, op_data, mrinfo, offset, ppage);
3050                 RETURN(rc);
3051         }
3052
3053         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
3054         if (IS_ERR(tgt))
3055                 RETURN(PTR_ERR(tgt));
3056
3057         rc = md_read_page(tgt->ltd_exp, op_data, mrinfo, offset, ppage);
3058
3059         RETURN(rc);
3060 }
3061
3062 /**
3063  * Unlink a file/directory
3064  *
3065  * Unlink a file or directory under the parent dir. The unlink request
3066  * usually will be sent to the MDT where the child is located, but if
3067  * the client does not have the child FID then request will be sent to the
3068  * MDT where the parent is located.
3069  *
3070  * If the parent is a striped directory then it also needs to locate which
3071  * stripe the name of the child is located, and replace the parent FID
3072  * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown,
3073  * it will walk through all of sub-stripes until the child is being
3074  * unlinked finally.
3075  *
3076  * \param[in] exp       export refer to LMV
3077  * \param[in] op_data   different parameters transferred beween client
3078  *                      MD stacks, name, namelen, FIDs etc.
3079  *                      op_fid1 is the parent FID, op_fid2 is the child
3080  *                      FID.
3081  * \param[out] request  point to the request of unlink.
3082  *
3083  * retval               0 if succeed
3084  *                      negative errno if failed.
3085  */
3086 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
3087                       struct ptlrpc_request **request)
3088 {
3089         struct obd_device *obd = exp->exp_obd;
3090         struct lmv_obd *lmv = &obd->u.lmv;
3091         struct lmv_tgt_desc *tgt;
3092         struct lmv_tgt_desc *parent_tgt;
3093         struct mdt_body *body;
3094         int rc;
3095
3096         ENTRY;
3097
3098         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
3099         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
3100         op_data->op_cap = current_cap();
3101
3102 retry:
3103         parent_tgt = lmv_locate_tgt(lmv, op_data);
3104         if (IS_ERR(parent_tgt))
3105                 RETURN(PTR_ERR(parent_tgt));
3106
3107         if (likely(!fid_is_zero(&op_data->op_fid2))) {
3108                 tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
3109                 if (IS_ERR(tgt))
3110                         RETURN(PTR_ERR(tgt));
3111         } else {
3112                 tgt = parent_tgt;
3113         }
3114
3115         /*
3116          * If child's fid is given, cancel unused locks for it if it is from
3117          * another export than parent.
3118          *
3119          * LOOKUP lock for child (fid3) should also be cancelled on parent
3120          * tgt_tgt in mdc_unlink().
3121          */
3122         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
3123
3124         if (parent_tgt != tgt)
3125                 rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_index,
3126                                       LCK_EX, MDS_INODELOCK_LOOKUP,
3127                                       MF_MDC_CANCEL_FID3);
3128
3129         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_index, LCK_EX,
3130                               MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3);
3131         if (rc)
3132                 RETURN(rc);
3133
3134         CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%u\n",
3135                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2),
3136                tgt->ltd_index);
3137
3138         rc = md_unlink(tgt->ltd_exp, op_data, request);
3139         if (rc == -ENOENT && lmv_dir_retry_check_update(op_data)) {
3140                 ptlrpc_req_finished(*request);
3141                 *request = NULL;
3142                 goto retry;
3143         }
3144
3145         if (rc != -EREMOTE)
3146                 RETURN(rc);
3147
3148         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
3149         if (body == NULL)
3150                 RETURN(-EPROTO);
3151
3152         /* Not cross-ref case, just get out of here. */
3153         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
3154                 RETURN(rc);
3155
3156         /* This is a remote object, try remote MDT. */
3157         op_data->op_fid2 = body->mbo_fid1;
3158         ptlrpc_req_finished(*request);
3159         *request = NULL;
3160
3161         tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
3162         if (IS_ERR(tgt))
3163                 RETURN(PTR_ERR(tgt));
3164
3165         goto retry;
3166 }
3167
3168 static int lmv_precleanup(struct obd_device *obd)
3169 {
3170         ENTRY;
3171         libcfs_kkuc_group_rem(&obd->obd_uuid, 0, KUC_GRP_HSM);
3172         RETURN(0);
3173 }
3174
3175 /**
3176  * Get by key a value associated with a LMV device.
3177  *
3178  * Dispatch request to lower-layer devices as needed.
3179  *
3180  * \param[in] env               execution environment for this thread
3181  * \param[in] exp               export for the LMV device
3182  * \param[in] keylen            length of key identifier
3183  * \param[in] key               identifier of key to get value for
3184  * \param[in] vallen            size of \a val
3185  * \param[out] val              pointer to storage location for value
3186  * \param[in] lsm               optional striping metadata of object
3187  *
3188  * \retval 0            on success
3189  * \retval negative     negated errno on failure
3190  */
3191 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
3192                         __u32 keylen, void *key, __u32 *vallen, void *val)
3193 {
3194         struct obd_device *obd;
3195         struct lmv_obd *lmv;
3196         struct lu_tgt_desc *tgt;
3197         int rc = 0;
3198
3199         ENTRY;
3200
3201         obd = class_exp2obd(exp);
3202         if (obd == NULL) {
3203                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
3204                        exp->exp_handle.h_cookie);
3205                 RETURN(-EINVAL);
3206         }
3207
3208         lmv = &obd->u.lmv;
3209         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
3210                 LASSERT(*vallen == sizeof(__u32));
3211                 lmv_foreach_connected_tgt(lmv, tgt) {
3212                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
3213                                           vallen, val))
3214                                 RETURN(0);
3215                 }
3216                 RETURN(-EINVAL);
3217         } else if (KEY_IS(KEY_MAX_EASIZE) ||
3218                    KEY_IS(KEY_DEFAULT_EASIZE) ||
3219                    KEY_IS(KEY_CONN_DATA)) {
3220                 /*
3221                  * Forwarding this request to first MDS, it should know LOV
3222                  * desc.
3223                  */
3224                 tgt = lmv_tgt(lmv, 0);
3225                 if (!tgt)
3226                         RETURN(-ENODEV);
3227
3228                 rc = obd_get_info(env, tgt->ltd_exp, keylen, key, vallen, val);
3229                 if (!rc && KEY_IS(KEY_CONN_DATA))
3230                         exp->exp_connect_data = *(struct obd_connect_data *)val;
3231                 RETURN(rc);
3232         } else if (KEY_IS(KEY_TGT_COUNT)) {
3233                 *((int *)val) = lmv->lmv_mdt_descs.ltd_tgts_size;
3234                 RETURN(0);
3235         }
3236
3237         CDEBUG(D_IOCTL, "Invalid key\n");
3238         RETURN(-EINVAL);
3239 }
3240
3241 static int lmv_rmfid(struct obd_export *exp, struct fid_array *fa,
3242                      int *__rcs, struct ptlrpc_request_set *_set)
3243 {
3244         struct obd_device *obd = class_exp2obd(exp);
3245         struct ptlrpc_request_set *set = _set;
3246         struct lmv_obd *lmv = &obd->u.lmv;
3247         int tgt_count = lmv->lmv_mdt_count;
3248         struct lu_tgt_desc *tgt;
3249         struct fid_array *fat, **fas = NULL;
3250         int i, rc, **rcs = NULL;
3251
3252         if (!set) {
3253                 set = ptlrpc_prep_set();
3254                 if (!set)
3255                         RETURN(-ENOMEM);
3256         }
3257
3258         /* split FIDs by targets */
3259         OBD_ALLOC_PTR_ARRAY(fas, tgt_count);
3260         if (fas == NULL)
3261                 GOTO(out, rc = -ENOMEM);
3262         OBD_ALLOC_PTR_ARRAY(rcs, tgt_count);
3263         if (rcs == NULL)
3264                 GOTO(out_fas, rc = -ENOMEM);
3265
3266         for (i = 0; i < fa->fa_nr; i++) {
3267                 unsigned int idx;
3268
3269                 rc = lmv_fld_lookup(lmv, &fa->fa_fids[i], &idx);
3270                 if (rc) {
3271                         CDEBUG(D_OTHER, "can't lookup "DFID": rc = %d\n",
3272                                PFID(&fa->fa_fids[i]), rc);
3273                         continue;
3274                 }
3275                 LASSERT(idx < tgt_count);
3276                 if (!fas[idx])
3277                         OBD_ALLOC(fas[idx], offsetof(struct fid_array,
3278                                   fa_fids[fa->fa_nr]));
3279                 if (!fas[idx])
3280                         GOTO(out, rc = -ENOMEM);
3281                 if (!rcs[idx])
3282                         OBD_ALLOC_PTR_ARRAY(rcs[idx], fa->fa_nr);
3283                 if (!rcs[idx])
3284                         GOTO(out, rc = -ENOMEM);
3285
3286                 fat = fas[idx];
3287                 fat->fa_fids[fat->fa_nr++] = fa->fa_fids[i];
3288         }
3289
3290         lmv_foreach_connected_tgt(lmv, tgt) {
3291                 fat = fas[tgt->ltd_index];
3292                 if (!fat || fat->fa_nr == 0)
3293                         continue;
3294                 rc = md_rmfid(tgt->ltd_exp, fat, rcs[tgt->ltd_index], set);
3295         }
3296
3297         rc = ptlrpc_set_wait(NULL, set);
3298         if (rc == 0) {
3299                 int j = 0;
3300                 for (i = 0; i < tgt_count; i++) {
3301                         fat = fas[i];
3302                         if (!fat || fat->fa_nr == 0)
3303                                 continue;
3304                         /* copy FIDs back */
3305                         memcpy(fa->fa_fids + j, fat->fa_fids,
3306                                fat->fa_nr * sizeof(struct lu_fid));
3307                         /* copy rcs back */
3308                         memcpy(__rcs + j, rcs[i], fat->fa_nr * sizeof(**rcs));
3309                         j += fat->fa_nr;
3310                 }
3311         }
3312         if (set != _set)
3313                 ptlrpc_set_destroy(set);
3314
3315 out:
3316         for (i = 0; i < tgt_count; i++) {
3317                 if (fas && fas[i])
3318                         OBD_FREE(fas[i], offsetof(struct fid_array,
3319                                                 fa_fids[fa->fa_nr]));
3320                 if (rcs && rcs[i])
3321                         OBD_FREE_PTR_ARRAY(rcs[i], fa->fa_nr);
3322         }
3323         if (rcs)
3324                 OBD_FREE_PTR_ARRAY(rcs, tgt_count);
3325 out_fas:
3326         if (fas)
3327                 OBD_FREE_PTR_ARRAY(fas, tgt_count);
3328
3329         RETURN(rc);
3330 }
3331
3332 /**
3333  * Asynchronously set by key a value associated with a LMV device.
3334  *
3335  * Dispatch request to lower-layer devices as needed.
3336  *
3337  * \param[in] env       execution environment for this thread
3338  * \param[in] exp       export for the LMV device
3339  * \param[in] keylen    length of key identifier
3340  * \param[in] key       identifier of key to store value for
3341  * \param[in] vallen    size of value to store
3342  * \param[in] val       pointer to data to be stored
3343  * \param[in] set       optional list of related ptlrpc requests
3344  *
3345  * \retval 0            on success
3346  * \retval negative     negated errno on failure
3347  */
3348 static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
3349                               __u32 keylen, void *key, __u32 vallen, void *val,
3350                               struct ptlrpc_request_set *set)
3351 {
3352         struct lmv_tgt_desc *tgt;
3353         struct obd_device *obd;
3354         struct lmv_obd *lmv;
3355         int rc = 0;
3356         ENTRY;
3357
3358         obd = class_exp2obd(exp);
3359         if (obd == NULL) {
3360                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
3361                        exp->exp_handle.h_cookie);
3362                 RETURN(-EINVAL);
3363         }
3364         lmv = &obd->u.lmv;
3365
3366         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) ||
3367             KEY_IS(KEY_DEFAULT_EASIZE)) {
3368                 int err = 0;
3369
3370                 lmv_foreach_connected_tgt(lmv, tgt) {
3371                         err = obd_set_info_async(env, tgt->ltd_exp,
3372                                                  keylen, key, vallen, val, set);
3373                         if (err && rc == 0)
3374                                 rc = err;
3375                 }
3376
3377                 RETURN(rc);
3378         }
3379
3380         RETURN(-EINVAL);
3381 }
3382
3383 static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
3384                             const struct lmv_mds_md_v1 *lmm1)
3385 {
3386         struct lmv_obd  *lmv = &exp->exp_obd->u.lmv;
3387         int             stripe_count;
3388         int             cplen;
3389         int             i;
3390         int             rc = 0;
3391         ENTRY;
3392
3393         lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic);
3394         lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
3395         lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index);
3396         if (CFS_FAIL_CHECK(OBD_FAIL_LMV_UNKNOWN_STRIPE))
3397                 lsm->lsm_md_hash_type = cfs_fail_val ?: LMV_HASH_TYPE_UNKNOWN;
3398         else
3399                 lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
3400         lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
3401         lsm->lsm_md_migrate_offset = le32_to_cpu(lmm1->lmv_migrate_offset);
3402         lsm->lsm_md_migrate_hash = le32_to_cpu(lmm1->lmv_migrate_hash);
3403         cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
3404                         sizeof(lsm->lsm_md_pool_name));
3405
3406         if (cplen >= sizeof(lsm->lsm_md_pool_name))
3407                 RETURN(-E2BIG);
3408
3409         CDEBUG(D_INFO, "unpack lsm count %d/%d, master %d hash_type %#x/%#x "
3410                "layout_version %d\n", lsm->lsm_md_stripe_count,
3411                lsm->lsm_md_migrate_offset, lsm->lsm_md_master_mdt_index,
3412                lsm->lsm_md_hash_type, lsm->lsm_md_migrate_hash,
3413                lsm->lsm_md_layout_version);
3414
3415         stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
3416         for (i = 0; i < stripe_count; i++) {
3417                 fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid,
3418                               &lmm1->lmv_stripe_fids[i]);
3419                 /*
3420                  * set default value -1, so lmv_locate_tgt() knows this stripe
3421                  * target is not initialized.
3422                  */
3423                 lsm->lsm_md_oinfo[i].lmo_mds = LMV_OFFSET_DEFAULT;
3424                 if (!fid_is_sane(&lsm->lsm_md_oinfo[i].lmo_fid))
3425                         continue;
3426
3427                 rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid,
3428                                     &lsm->lsm_md_oinfo[i].lmo_mds);
3429                 if (rc == -ENOENT)
3430                         continue;
3431
3432                 if (rc)
3433                         RETURN(rc);
3434
3435                 CDEBUG(D_INFO, "unpack fid #%d "DFID"\n", i,
3436                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid));
3437         }
3438
3439         RETURN(rc);
3440 }
3441
3442 static inline int lmv_unpack_user_md(struct obd_export *exp,
3443                                      struct lmv_stripe_md *lsm,
3444                                      const struct lmv_user_md *lmu)
3445 {
3446         lsm->lsm_md_magic = le32_to_cpu(lmu->lum_magic);
3447         lsm->lsm_md_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
3448         lsm->lsm_md_master_mdt_index = le32_to_cpu(lmu->lum_stripe_offset);
3449         lsm->lsm_md_hash_type = le32_to_cpu(lmu->lum_hash_type);
3450         lsm->lsm_md_max_inherit = lmu->lum_max_inherit;
3451         lsm->lsm_md_max_inherit_rr = lmu->lum_max_inherit_rr;
3452         lsm->lsm_md_pool_name[LOV_MAXPOOLNAME] = 0;
3453
3454         return 0;
3455 }
3456
3457 static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp,
3458                         const union lmv_mds_md *lmm, size_t lmm_size)
3459 {
3460         struct lmv_stripe_md     *lsm;
3461         int                      lsm_size;
3462         int                      rc;
3463         bool                     allocated = false;
3464         ENTRY;
3465
3466         LASSERT(lsmp != NULL);
3467
3468         lsm = *lsmp;
3469         /* Free memmd */
3470         if (lsm != NULL && lmm == NULL) {
3471                 int i;
3472                 struct lmv_foreign_md *lfm = (struct lmv_foreign_md *)lsm;
3473
3474                 if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) {
3475                         size_t lfm_size;
3476
3477                         lfm_size = lfm->lfm_length + offsetof(typeof(*lfm),
3478                                                               lfm_value[0]);
3479                         OBD_FREE_LARGE(lfm, lfm_size);
3480                         RETURN(0);
3481                 }
3482
3483                 if (lmv_dir_striped(lsm)) {
3484                         for (i = 0; i < lsm->lsm_md_stripe_count; i++)
3485                                 iput(lsm->lsm_md_oinfo[i].lmo_root);
3486                         lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
3487                 } else {
3488                         lsm_size = lmv_stripe_md_size(0);
3489                 }
3490                 OBD_FREE(lsm, lsm_size);
3491                 *lsmp = NULL;
3492                 RETURN(0);
3493         }
3494
3495         /* foreign lmv case */
3496         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_FOREIGN) {
3497                 struct lmv_foreign_md *lfm = (struct lmv_foreign_md *)lsm;
3498
3499                 if (lfm == NULL) {
3500                         OBD_ALLOC_LARGE(lfm, lmm_size);
3501                         if (lfm == NULL)
3502                                 RETURN(-ENOMEM);
3503                         *lsmp = (struct lmv_stripe_md *)lfm;
3504                 }
3505                 lfm->lfm_magic = le32_to_cpu(lmm->lmv_foreign_md.lfm_magic);
3506                 lfm->lfm_length = le32_to_cpu(lmm->lmv_foreign_md.lfm_length);
3507                 lfm->lfm_type = le32_to_cpu(lmm->lmv_foreign_md.lfm_type);
3508                 lfm->lfm_flags = le32_to_cpu(lmm->lmv_foreign_md.lfm_flags);
3509                 memcpy(&lfm->lfm_value, &lmm->lmv_foreign_md.lfm_value,
3510                        lfm->lfm_length);
3511                 RETURN(lmm_size);
3512         }
3513
3514         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE)
3515                 RETURN(-EPERM);
3516
3517         /* Unpack memmd */
3518         if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
3519             le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
3520                 CERROR("%s: invalid lmv magic %x: rc = %d\n",
3521                        exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic),
3522                        -EIO);
3523                 RETURN(-EIO);
3524         }
3525
3526         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1)
3527                 lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
3528         else
3529                 /**
3530                  * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md,
3531                  * stripecount should be 0 then.
3532                  */
3533                 lsm_size = lmv_stripe_md_size(0);
3534
3535         if (lsm == NULL) {
3536                 OBD_ALLOC(lsm, lsm_size);
3537                 if (lsm == NULL)
3538                         RETURN(-ENOMEM);
3539                 allocated = true;
3540                 *lsmp = lsm;
3541         }
3542
3543         switch (le32_to_cpu(lmm->lmv_magic)) {
3544         case LMV_MAGIC_V1:
3545                 rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
3546                 break;
3547         case LMV_USER_MAGIC:
3548                 rc = lmv_unpack_user_md(exp, lsm, &lmm->lmv_user_md);
3549                 break;
3550         default:
3551                 CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name,
3552                        le32_to_cpu(lmm->lmv_magic));
3553                 rc = -EINVAL;
3554                 break;
3555         }
3556
3557         if (rc != 0 && allocated) {
3558                 OBD_FREE(lsm, lsm_size);
3559                 *lsmp = NULL;
3560                 lsm_size = rc;
3561         }
3562         RETURN(lsm_size);
3563 }
3564
3565 void lmv_free_memmd(struct lmv_stripe_md *lsm)
3566 {
3567         lmv_unpackmd(NULL, &lsm, NULL, 0);
3568 }
3569 EXPORT_SYMBOL(lmv_free_memmd);
3570
3571 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
3572                              union ldlm_policy_data *policy,
3573                              enum ldlm_mode mode, enum ldlm_cancel_flags flags,
3574                              void *opaque)
3575 {
3576         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
3577         struct lu_tgt_desc *tgt;
3578         int err;
3579         int rc = 0;
3580
3581         ENTRY;
3582
3583         LASSERT(fid != NULL);
3584
3585         lmv_foreach_connected_tgt(lmv, tgt) {
3586                 if (!tgt->ltd_active)
3587                         continue;
3588
3589                 err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags,
3590                                        opaque);
3591                 if (!rc)
3592                         rc = err;
3593         }
3594         RETURN(rc);
3595 }
3596
3597 static int lmv_set_lock_data(struct obd_export *exp,
3598                              const struct lustre_handle *lockh,
3599                              void *data, __u64 *bits)
3600 {
3601         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
3602         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
3603         int rc;
3604
3605         ENTRY;
3606
3607         if (tgt == NULL || tgt->ltd_exp == NULL)
3608                 RETURN(-EINVAL);
3609         rc =  md_set_lock_data(tgt->ltd_exp, lockh, data, bits);
3610         RETURN(rc);
3611 }
3612
3613 static enum ldlm_mode
3614 lmv_lock_match(struct obd_export *exp, __u64 flags,
3615                const struct lu_fid *fid, enum ldlm_type type,
3616                union ldlm_policy_data *policy,
3617                enum ldlm_mode mode, struct lustre_handle *lockh)
3618 {
3619         struct obd_device *obd = exp->exp_obd;
3620         struct lmv_obd *lmv = &obd->u.lmv;
3621         struct lu_tgt_desc *tgt;
3622         __u64 bits = policy->l_inodebits.bits;
3623         enum ldlm_mode rc = LCK_MINMODE;
3624         int index;
3625         int i;
3626
3627         /* only one bit is set */
3628         LASSERT(bits && !(bits & (bits - 1)));
3629         /* With DNE every object can have two locks in different namespaces:
3630          * lookup lock in space of MDT storing direntry and update/open lock in
3631          * space of MDT storing inode.  Try the MDT that the FID maps to first,
3632          * since this can be easily found, and only try others if that fails.
3633          */
3634         if (bits == MDS_INODELOCK_LOOKUP) {
3635                 for (i = 0, index = lmv_fid2tgt_index(lmv, fid);
3636                      i < lmv->lmv_mdt_descs.ltd_tgts_size; i++,
3637                      index = (index + 1) % lmv->lmv_mdt_descs.ltd_tgts_size) {
3638                         if (index < 0) {
3639                                 CDEBUG(D_HA,
3640                                        "%s: "DFID" is inaccessible: rc = %d\n",
3641                                        obd->obd_name, PFID(fid), index);
3642                                 index = 0;
3643                         }
3644                         tgt = lmv_tgt(lmv, index);
3645                         if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
3646                                 continue;
3647                         rc = md_lock_match(tgt->ltd_exp, flags, fid, type,
3648                                            policy, mode, lockh);
3649                         if (rc)
3650                                 break;
3651                 }
3652         } else {
3653                 tgt = lmv_fid2tgt(lmv, fid);
3654                 if (!IS_ERR(tgt) && tgt->ltd_exp && tgt->ltd_active)
3655                         rc = md_lock_match(tgt->ltd_exp, flags, fid, type,
3656                                            policy, mode, lockh);
3657         }
3658
3659         CDEBUG(D_INODE, "Lock match for "DFID": %d\n", PFID(fid), rc);
3660
3661         return rc;
3662 }
3663
3664 static int
3665 lmv_get_lustre_md(struct obd_export *exp, struct req_capsule *pill,
3666                   struct obd_export *dt_exp, struct obd_export *md_exp,
3667                   struct lustre_md *md)
3668 {
3669         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
3670         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
3671
3672         if (!tgt || !tgt->ltd_exp)
3673                 return -EINVAL;
3674
3675         return md_get_lustre_md(tgt->ltd_exp, pill, dt_exp, md_exp, md);
3676 }
3677
3678 static int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
3679 {
3680         struct obd_device *obd = exp->exp_obd;
3681         struct lmv_obd *lmv = &obd->u.lmv;
3682         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
3683
3684         ENTRY;
3685
3686         if (md->default_lmv) {
3687                 lmv_free_memmd(md->default_lmv);
3688                 md->default_lmv = NULL;
3689         }
3690         if (md->lmv != NULL) {
3691                 lmv_free_memmd(md->lmv);
3692                 md->lmv = NULL;
3693         }
3694         if (!tgt || !tgt->ltd_exp)
3695                 RETURN(-EINVAL);
3696         RETURN(md_free_lustre_md(tgt->ltd_exp, md));
3697 }
3698
3699 static int lmv_set_open_replay_data(struct obd_export *exp,
3700                                     struct obd_client_handle *och,
3701                                     struct lookup_intent *it)
3702 {
3703         struct obd_device *obd = exp->exp_obd;
3704         struct lmv_obd *lmv = &obd->u.lmv;
3705         struct lmv_tgt_desc *tgt;
3706
3707         ENTRY;
3708
3709         tgt = lmv_fid2tgt(lmv, &och->och_fid);
3710         if (IS_ERR(tgt))
3711                 RETURN(PTR_ERR(tgt));
3712
3713         RETURN(md_set_open_replay_data(tgt->ltd_exp, och, it));
3714 }
3715
3716 static int lmv_clear_open_replay_data(struct obd_export *exp,
3717                                       struct obd_client_handle *och)
3718 {
3719         struct obd_device *obd = exp->exp_obd;
3720         struct lmv_obd *lmv = &obd->u.lmv;
3721         struct lmv_tgt_desc *tgt;
3722
3723         ENTRY;
3724
3725         tgt = lmv_fid2tgt(lmv, &och->och_fid);
3726         if (IS_ERR(tgt))
3727                 RETURN(PTR_ERR(tgt));
3728
3729         RETURN(md_clear_open_replay_data(tgt->ltd_exp, och));
3730 }
3731
3732 static int lmv_intent_getattr_async(struct obd_export *exp,
3733                                     struct md_op_item *item)
3734 {
3735         struct md_op_data *op_data = &item->mop_data;
3736         struct obd_device *obd = exp->exp_obd;
3737         struct lmv_obd *lmv = &obd->u.lmv;
3738         struct lmv_tgt_desc *ptgt;
3739         struct lmv_tgt_desc *ctgt;
3740         int rc;
3741
3742         ENTRY;
3743
3744         if (!fid_is_sane(&op_data->op_fid2))
3745                 RETURN(-EINVAL);
3746
3747         ptgt = lmv_locate_tgt(lmv, op_data);
3748         if (IS_ERR(ptgt))
3749                 RETURN(PTR_ERR(ptgt));
3750
3751         ctgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
3752         if (IS_ERR(ctgt))
3753                 RETURN(PTR_ERR(ctgt));
3754
3755         /*
3756          * remote object needs two RPCs to lookup and getattr, considering the
3757          * complexity don't support statahead for now.
3758          */
3759         if (ctgt != ptgt)
3760                 RETURN(-EREMOTE);
3761
3762         rc = md_intent_getattr_async(ptgt->ltd_exp, item);
3763
3764         RETURN(rc);
3765 }
3766
3767 static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
3768                                struct lu_fid *fid, __u64 *bits)
3769 {
3770         struct obd_device *obd = exp->exp_obd;
3771         struct lmv_obd *lmv = &obd->u.lmv;
3772         struct lmv_tgt_desc *tgt;
3773         int rc;
3774
3775         ENTRY;
3776
3777         tgt = lmv_fid2tgt(lmv, fid);
3778         if (IS_ERR(tgt))
3779                 RETURN(PTR_ERR(tgt));
3780
3781         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
3782         RETURN(rc);
3783 }
3784
3785 static int lmv_get_fid_from_lsm(struct obd_export *exp,
3786                                 const struct lmv_stripe_md *lsm,
3787                                 const char *name, int namelen,
3788                                 struct lu_fid *fid)
3789 {
3790         const struct lmv_oinfo *oinfo;
3791
3792         if (!lmv_dir_striped(lsm))
3793                 RETURN(-ESTALE);
3794
3795         oinfo = lsm_name_to_stripe_info(lsm, name, namelen, false);
3796         if (IS_ERR(oinfo))
3797                 return PTR_ERR(oinfo);
3798
3799         *fid = oinfo->lmo_fid;
3800
3801         RETURN(0);
3802 }
3803
3804 /**
3805  * For lmv, only need to send request to master MDT, and the master MDT will
3806  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
3807  * we directly fetch data from the slave MDTs.
3808  */
3809 static int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
3810                         struct obd_quotactl *oqctl)
3811 {
3812         struct obd_device *obd = class_exp2obd(exp);
3813         struct lmv_obd *lmv = &obd->u.lmv;
3814         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
3815         __u64 curspace, curinodes;
3816         int rc = 0;
3817
3818         ENTRY;
3819
3820         if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) {
3821                 CERROR("master lmv inactive\n");
3822                 RETURN(-EIO);
3823         }
3824
3825         if (oqctl->qc_cmd != Q_GETOQUOTA) {
3826                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
3827                 RETURN(rc);
3828         }
3829
3830         curspace = curinodes = 0;
3831         lmv_foreach_connected_tgt(lmv, tgt) {
3832                 int err;
3833
3834                 if (!tgt->ltd_active)
3835                         continue;
3836
3837                 err = obd_quotactl(tgt->ltd_exp, oqctl);
3838                 if (err) {
3839                         CERROR("getquota on mdt %d failed. %d\n",
3840                                tgt->ltd_index, err);
3841                         if (!rc)
3842                                 rc = err;
3843                 } else {
3844                         curspace += oqctl->qc_dqblk.dqb_curspace;
3845                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
3846                 }
3847         }
3848         oqctl->qc_dqblk.dqb_curspace = curspace;
3849         oqctl->qc_dqblk.dqb_curinodes = curinodes;
3850
3851         RETURN(rc);
3852 }
3853
3854 static int lmv_merge_attr(struct obd_export *exp,
3855                           const struct lmv_stripe_md *lsm,
3856                           struct cl_attr *attr,
3857                           ldlm_blocking_callback cb_blocking)
3858 {
3859         int rc;
3860         int i;
3861
3862         if (!lmv_dir_striped(lsm))
3863                 return 0;
3864
3865         rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0);
3866         if (rc < 0)
3867                 return rc;
3868
3869         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
3870                 struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
3871
3872                 if (!inode)
3873                         continue;
3874
3875                 CDEBUG(D_INFO,
3876                        "" DFID " size %llu, blocks %llu nlink %u, atime %lld ctime %lld, mtime %lld.\n",
3877                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
3878                        i_size_read(inode), (unsigned long long)inode->i_blocks,
3879                        inode->i_nlink, (s64)inode->i_atime.tv_sec,
3880                        (s64)inode->i_ctime.tv_sec, (s64)inode->i_mtime.tv_sec);
3881
3882                 /* for slave stripe, it needs to subtract nlink for . and .. */
3883                 if (i != 0)
3884                         attr->cat_nlink += inode->i_nlink - 2;
3885                 else
3886                         attr->cat_nlink = inode->i_nlink;
3887
3888                 attr->cat_size += i_size_read(inode);
3889                 attr->cat_blocks += inode->i_blocks;
3890
3891                 if (attr->cat_atime < inode->i_atime.tv_sec)
3892                         attr->cat_atime = inode->i_atime.tv_sec;
3893
3894                 if (attr->cat_ctime < inode->i_ctime.tv_sec)
3895                         attr->cat_ctime = inode->i_ctime.tv_sec;
3896
3897                 if (attr->cat_mtime < inode->i_mtime.tv_sec)
3898                         attr->cat_mtime = inode->i_mtime.tv_sec;
3899         }
3900         return 0;
3901 }
3902
3903 static struct lu_batch *lmv_batch_create(struct obd_export *exp,
3904                                          enum lu_batch_flags flags,
3905                                          __u32 max_count)
3906 {
3907         struct lu_batch *bh;
3908         struct lmv_batch *lbh;
3909
3910         ENTRY;
3911         OBD_ALLOC_PTR(lbh);
3912         if (!lbh)
3913                 RETURN(ERR_PTR(-ENOMEM));
3914
3915         bh = &lbh->lbh_super;
3916         bh->lbt_flags = flags;
3917         bh->lbt_max_count = max_count;
3918
3919         if (flags & BATCH_FL_RQSET) {
3920                 bh->lbt_rqset = ptlrpc_prep_set();
3921                 if (bh->lbt_rqset == NULL) {
3922                         OBD_FREE_PTR(lbh);
3923                         RETURN(ERR_PTR(-ENOMEM));
3924                 }
3925         }
3926
3927         INIT_LIST_HEAD(&lbh->lbh_sub_batch_list);
3928         RETURN(bh);
3929 }
3930
3931 static int lmv_batch_stop(struct obd_export *exp, struct lu_batch *bh)
3932 {
3933         struct lmv_batch *lbh;
3934         struct lmvsub_batch *sub;
3935         struct lmvsub_batch *tmp;
3936         int rc = 0;
3937
3938         ENTRY;
3939
3940         lbh = container_of(bh, struct lmv_batch, lbh_super);
3941         list_for_each_entry_safe(sub, tmp, &lbh->lbh_sub_batch_list,
3942                                  sbh_sub_item) {
3943                 list_del(&sub->sbh_sub_item);
3944                 rc = md_batch_stop(sub->sbh_tgt->ltd_exp, sub->sbh_sub);
3945                 if (rc < 0) {
3946                         CERROR("%s: stop batch processing failed: rc = %d\n",
3947                                exp->exp_obd->obd_name, rc);
3948                         if (bh->lbt_result == 0)
3949                                 bh->lbt_result = rc;
3950                 }
3951                 OBD_FREE_PTR(sub);
3952         }
3953
3954         if (bh->lbt_flags & BATCH_FL_RQSET) {
3955                 rc = ptlrpc_set_wait(NULL, bh->lbt_rqset);
3956                 ptlrpc_set_destroy(bh->lbt_rqset);
3957         }
3958
3959         OBD_FREE_PTR(lbh);
3960         RETURN(rc);
3961 }
3962
3963 static int lmv_batch_flush(struct obd_export *exp, struct lu_batch *bh,
3964                            bool wait)
3965 {
3966         struct lmv_batch *lbh;
3967         struct lmvsub_batch *sub;
3968         int rc = 0;
3969         int rc1;
3970
3971         ENTRY;
3972
3973         lbh = container_of(bh, struct lmv_batch, lbh_super);
3974         list_for_each_entry(sub, &lbh->lbh_sub_batch_list, sbh_sub_item) {
3975                 rc1 = md_batch_flush(sub->sbh_tgt->ltd_exp, sub->sbh_sub, wait);
3976                 if (rc1 < 0) {
3977                         CERROR("%s: stop batch processing failed: rc = %d\n",
3978                                exp->exp_obd->obd_name, rc);
3979                         if (bh->lbt_result == 0)
3980                                 bh->lbt_result = rc;
3981
3982                         if (rc == 0)
3983                                 rc = rc1;
3984                 }
3985         }
3986
3987         if (wait && bh->lbt_flags & BATCH_FL_RQSET) {
3988                 rc1 = ptlrpc_set_wait(NULL, bh->lbt_rqset);
3989                 if (rc == 0)
3990                         rc = rc1;
3991         }
3992
3993         RETURN(rc);
3994 }
3995
3996 static inline struct lmv_tgt_desc *
3997 lmv_batch_locate_tgt(struct lmv_obd *lmv, struct md_op_item *item)
3998 {
3999         struct lmv_tgt_desc *tgt;
4000
4001         switch (item->mop_opc) {
4002         default:
4003                 tgt = ERR_PTR(-ENOTSUPP);
4004         }
4005
4006         return tgt;
4007 }
4008
4009 struct lu_batch *lmv_batch_lookup_sub(struct lmv_batch *lbh,
4010                                       struct lmv_tgt_desc *tgt)
4011 {
4012         struct lmvsub_batch *sub;
4013
4014         list_for_each_entry(sub, &lbh->lbh_sub_batch_list, sbh_sub_item) {
4015                 if (sub->sbh_tgt == tgt)
4016                         return sub->sbh_sub;
4017         }
4018
4019         return NULL;
4020 }
4021
4022 struct lu_batch *lmv_batch_get_sub(struct lmv_batch *lbh,
4023                                    struct lmv_tgt_desc *tgt)
4024 {
4025         struct lmvsub_batch *sbh;
4026         struct lu_batch *child_bh;
4027         struct lu_batch *bh;
4028
4029         ENTRY;
4030
4031         child_bh = lmv_batch_lookup_sub(lbh, tgt);
4032         if (child_bh != NULL)
4033                 RETURN(child_bh);
4034
4035         OBD_ALLOC_PTR(sbh);
4036         if (sbh == NULL)
4037                 RETURN(ERR_PTR(-ENOMEM));
4038
4039         INIT_LIST_HEAD(&sbh->sbh_sub_item);
4040         sbh->sbh_tgt = tgt;
4041
4042         bh = &lbh->lbh_super;
4043         child_bh = md_batch_create(tgt->ltd_exp, bh->lbt_flags,
4044                                    bh->lbt_max_count);
4045         if (IS_ERR(child_bh)) {
4046                 OBD_FREE_PTR(sbh);
4047                 RETURN(child_bh);
4048         }
4049
4050         child_bh->lbt_rqset = bh->lbt_rqset;
4051         sbh->sbh_sub = child_bh;
4052         list_add(&sbh->sbh_sub_item, &lbh->lbh_sub_batch_list);
4053         RETURN(child_bh);
4054 }
4055
4056 static int lmv_batch_add(struct obd_export *exp, struct lu_batch *bh,
4057                          struct md_op_item *item)
4058 {
4059         struct obd_device *obd = exp->exp_obd;
4060         struct lmv_obd *lmv = &obd->u.lmv;
4061         struct lmv_tgt_desc *tgt;
4062         struct lmv_batch *lbh;
4063         struct lu_batch *child_bh;
4064         int rc;
4065
4066         ENTRY;
4067
4068         tgt = lmv_batch_locate_tgt(lmv, item);
4069         if (IS_ERR(tgt))
4070                 RETURN(PTR_ERR(tgt));
4071
4072         lbh = container_of(bh, struct lmv_batch, lbh_super);
4073         child_bh = lmv_batch_get_sub(lbh, tgt);
4074         if (IS_ERR(child_bh))
4075                 RETURN(PTR_ERR(child_bh));
4076
4077         rc = md_batch_add(tgt->ltd_exp, child_bh, item);
4078         RETURN(rc);
4079 }
4080
4081 static const struct obd_ops lmv_obd_ops = {
4082         .o_owner                = THIS_MODULE,
4083         .o_setup                = lmv_setup,
4084         .o_cleanup              = lmv_cleanup,
4085         .o_precleanup           = lmv_precleanup,
4086         .o_process_config       = lmv_process_config,
4087         .o_connect              = lmv_connect,
4088         .o_disconnect           = lmv_disconnect,
4089         .o_statfs               = lmv_statfs,
4090         .o_get_info             = lmv_get_info,
4091         .o_set_info_async       = lmv_set_info_async,
4092         .o_notify               = lmv_notify,
4093         .o_get_uuid             = lmv_get_uuid,
4094         .o_fid_alloc            = lmv_fid_alloc,
4095         .o_iocontrol            = lmv_iocontrol,
4096         .o_quotactl             = lmv_quotactl
4097 };
4098
4099 static const struct md_ops lmv_md_ops = {
4100         .m_get_root             = lmv_get_root,
4101         .m_null_inode           = lmv_null_inode,
4102         .m_close                = lmv_close,
4103         .m_create               = lmv_create,
4104         .m_enqueue              = lmv_enqueue,
4105         .m_getattr              = lmv_getattr,
4106         .m_getxattr             = lmv_getxattr,
4107         .m_getattr_name         = lmv_getattr_name,
4108         .m_intent_lock          = lmv_intent_lock,
4109         .m_link                 = lmv_link,
4110         .m_rename               = lmv_rename,
4111         .m_setattr              = lmv_setattr,
4112         .m_setxattr             = lmv_setxattr,
4113         .m_fsync                = lmv_fsync,
4114         .m_file_resync          = lmv_file_resync,
4115         .m_read_page            = lmv_read_page,
4116         .m_unlink               = lmv_unlink,
4117         .m_init_ea_size         = lmv_init_ea_size,
4118         .m_cancel_unused        = lmv_cancel_unused,
4119         .m_set_lock_data        = lmv_set_lock_data,
4120         .m_lock_match           = lmv_lock_match,
4121         .m_get_lustre_md        = lmv_get_lustre_md,
4122         .m_free_lustre_md       = lmv_free_lustre_md,
4123         .m_merge_attr           = lmv_merge_attr,
4124         .m_set_open_replay_data = lmv_set_open_replay_data,
4125         .m_clear_open_replay_data = lmv_clear_open_replay_data,
4126         .m_intent_getattr_async = lmv_intent_getattr_async,
4127         .m_revalidate_lock      = lmv_revalidate_lock,
4128         .m_get_fid_from_lsm     = lmv_get_fid_from_lsm,
4129         .m_unpackmd             = lmv_unpackmd,
4130         .m_rmfid                = lmv_rmfid,
4131         .m_batch_create         = lmv_batch_create,
4132         .m_batch_add            = lmv_batch_add,
4133         .m_batch_stop           = lmv_batch_stop,
4134         .m_batch_flush          = lmv_batch_flush,
4135 };
4136
4137 static int __init lmv_init(void)
4138 {
4139         return class_register_type(&lmv_obd_ops, &lmv_md_ops, true,
4140                                    LUSTRE_LMV_NAME, NULL);
4141 }
4142
4143 static void __exit lmv_exit(void)
4144 {
4145         class_unregister_type(LUSTRE_LMV_NAME);
4146 }
4147
4148 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
4149 MODULE_DESCRIPTION("Lustre Logical Metadata Volume");
4150 MODULE_VERSION(LUSTRE_VERSION_STRING);
4151 MODULE_LICENSE("GPL");
4152
4153 module_init(lmv_init);
4154 module_exit(lmv_exit);