Whamcloud - gitweb
9290f082a13990c2d324389dbcaa3b3b61057896
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_LMV
33
34 #include <linux/file.h>
35 #include <linux/module.h>
36 #include <linux/init.h>
37 #include <linux/user_namespace.h>
38 #include <linux/uidgid.h>
39 #include <linux/slab.h>
40 #include <linux/pagemap.h>
41 #include <linux/mm.h>
42 #include <linux/math64.h>
43 #include <linux/seq_file.h>
44 #include <linux/namei.h>
45
46 #include <obd_support.h>
47 #include <lustre_lib.h>
48 #include <lustre_net.h>
49 #include <obd_class.h>
50 #include <lustre_lmv.h>
51 #include <lprocfs_status.h>
52 #include <cl_object.h>
53 #include <lustre_fid.h>
54 #include <uapi/linux/lustre/lustre_ioctl.h>
55 #include <lustre_kernelcomm.h>
56 #include "lmv_internal.h"
57
58 static int lmv_check_connect(struct obd_device *obd);
59 static inline bool lmv_op_default_rr_mkdir(const struct md_op_data *op_data);
60
61 void lmv_activate_target(struct lmv_obd *lmv, struct lmv_tgt_desc *tgt,
62                          int activate)
63 {
64         if (tgt->ltd_active == activate)
65                 return;
66
67         tgt->ltd_active = activate;
68         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count +=
69                 (activate ? 1 : -1);
70
71         tgt->ltd_exp->exp_obd->obd_inactive = !activate;
72 }
73
74 /**
75  * Error codes:
76  *
77  *  -EINVAL  : UUID can't be found in the LMV's target list
78  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
79  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
80  */
81 static int lmv_set_mdc_active(struct lmv_obd *lmv,
82                               const struct obd_uuid *uuid,
83                               int activate)
84 {
85         struct lu_tgt_desc *tgt = NULL;
86         struct obd_device *obd;
87         int rc = 0;
88
89         ENTRY;
90
91         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
92                         lmv, uuid->uuid, activate);
93
94         spin_lock(&lmv->lmv_lock);
95         lmv_foreach_connected_tgt(lmv, tgt) {
96                 CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n",
97                        tgt->ltd_index, tgt->ltd_uuid.uuid,
98                        tgt->ltd_exp->exp_handle.h_cookie);
99
100                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
101                         break;
102         }
103
104         if (!tgt)
105                 GOTO(out_lmv_lock, rc = -EINVAL);
106
107         obd = class_exp2obd(tgt->ltd_exp);
108         if (obd == NULL)
109                 GOTO(out_lmv_lock, rc = -ENOTCONN);
110
111         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
112                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
113                obd->obd_type->typ_name, tgt->ltd_index);
114         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
115
116         if (tgt->ltd_active == activate) {
117                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
118                        activate ? "" : "in");
119                 GOTO(out_lmv_lock, rc);
120         }
121
122         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
123                activate ? "" : "in");
124         lmv_activate_target(lmv, tgt, activate);
125         EXIT;
126
127  out_lmv_lock:
128         spin_unlock(&lmv->lmv_lock);
129         return rc;
130 }
131
132 static struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
133 {
134         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
135         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
136
137         return tgt ? obd_get_uuid(tgt->ltd_exp) : NULL;
138 }
139
140 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
141                       enum obd_notify_event ev)
142 {
143         struct obd_connect_data *conn_data;
144         struct lmv_obd          *lmv = &obd->u.lmv;
145         struct obd_uuid         *uuid;
146         int                      rc = 0;
147         ENTRY;
148
149         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
150                 CERROR("unexpected notification of %s %s!\n",
151                        watched->obd_type->typ_name,
152                        watched->obd_name);
153                 RETURN(-EINVAL);
154         }
155
156         uuid = &watched->u.cli.cl_target_uuid;
157         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
158                 /*
159                  * Set MDC as active before notifying the observer, so the
160                  * observer can use the MDC normally.
161                  */
162                 rc = lmv_set_mdc_active(lmv, uuid,
163                                         ev == OBD_NOTIFY_ACTIVE);
164                 if (rc) {
165                         CERROR("%sactivation of %s failed: %d\n",
166                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
167                                uuid->uuid, rc);
168                         RETURN(rc);
169                 }
170         } else if (ev == OBD_NOTIFY_OCD) {
171                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
172                 /*
173                  * XXX: Make sure that ocd_connect_flags from all targets are
174                  * the same. Otherwise one of MDTs runs wrong version or
175                  * something like this.  --umka
176                  */
177                 obd->obd_self_export->exp_connect_data = *conn_data;
178         }
179
180         /*
181          * Pass the notification up the chain.
182          */
183         if (obd->obd_observer)
184                 rc = obd_notify(obd->obd_observer, watched, ev);
185
186         RETURN(rc);
187 }
188
189 static int lmv_connect(const struct lu_env *env,
190                        struct obd_export **pexp, struct obd_device *obd,
191                        struct obd_uuid *cluuid, struct obd_connect_data *data,
192                        void *localdata)
193 {
194         struct lmv_obd *lmv = &obd->u.lmv;
195         struct lustre_handle conn = { 0 };
196         struct obd_export *exp;
197         int rc;
198         ENTRY;
199
200         rc = class_connect(&conn, obd, cluuid);
201         if (rc) {
202                 CERROR("class_connection() returned %d\n", rc);
203                 RETURN(rc);
204         }
205
206         exp = class_conn2export(&conn);
207
208         lmv->connected = 0;
209         lmv->conn_data = *data;
210         lmv->lmv_cache = localdata;
211
212         lmv->lmv_tgts_kobj = kobject_create_and_add("target_obds",
213                                                     &obd->obd_kset.kobj);
214         if (!lmv->lmv_tgts_kobj) {
215                 CERROR("%s: cannot create /sys/fs/lustre/%s/%s/target_obds\n",
216                        obd->obd_name, obd->obd_type->typ_name, obd->obd_name);
217         }
218
219         rc = lmv_check_connect(obd);
220         if (rc != 0)
221                 GOTO(out_sysfs, rc);
222
223         *pexp = exp;
224
225         RETURN(rc);
226
227 out_sysfs:
228         if (lmv->lmv_tgts_kobj)
229                 kobject_put(lmv->lmv_tgts_kobj);
230
231         class_disconnect(exp);
232
233         return rc;
234 }
235
236 static int lmv_init_ea_size(struct obd_export *exp, __u32 easize,
237                             __u32 def_easize)
238 {
239         struct obd_device *obd = exp->exp_obd;
240         struct lmv_obd *lmv = &obd->u.lmv;
241         struct lmv_tgt_desc *tgt;
242         int change = 0;
243         int rc = 0;
244
245         ENTRY;
246
247         if (lmv->max_easize < easize) {
248                 lmv->max_easize = easize;
249                 change = 1;
250         }
251         if (lmv->max_def_easize < def_easize) {
252                 lmv->max_def_easize = def_easize;
253                 change = 1;
254         }
255
256         if (change == 0)
257                 RETURN(0);
258
259         if (lmv->connected == 0)
260                 RETURN(0);
261
262         lmv_foreach_connected_tgt(lmv, tgt) {
263                 if (!tgt->ltd_active)
264                         continue;
265
266                 rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize);
267                 if (rc) {
268                         CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
269                                " rc = %d\n", obd->obd_name, tgt->ltd_index, rc);
270                         break;
271                 }
272         }
273         RETURN(rc);
274 }
275
276 #define MAX_STRING_SIZE 128
277
278 static int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
279 {
280         struct lmv_obd *lmv = &obd->u.lmv;
281         struct obd_device *mdc_obd;
282         struct obd_export *mdc_exp;
283         struct lu_fld_target target;
284         int  rc;
285         ENTRY;
286
287         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
288                                         &obd->obd_uuid);
289         if (!mdc_obd) {
290                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
291                 RETURN(-EINVAL);
292         }
293
294         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s\n",
295                mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
296                tgt->ltd_uuid.uuid, obd->obd_uuid.uuid);
297
298         if (!mdc_obd->obd_set_up) {
299                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
300                 RETURN(-EINVAL);
301         }
302
303         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &obd->obd_uuid,
304                          &lmv->conn_data, lmv->lmv_cache);
305         if (rc) {
306                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
307                 RETURN(rc);
308         }
309
310         /*
311          * Init fid sequence client for this mdc and add new fld target.
312          */
313         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
314         if (rc)
315                 RETURN(rc);
316
317         target.ft_srv = NULL;
318         target.ft_exp = mdc_exp;
319         target.ft_idx = tgt->ltd_index;
320
321         fld_client_add_target(&lmv->lmv_fld, &target);
322
323         rc = obd_register_observer(mdc_obd, obd);
324         if (rc) {
325                 obd_disconnect(mdc_exp);
326                 CERROR("target %s register_observer error %d\n",
327                        tgt->ltd_uuid.uuid, rc);
328                 RETURN(rc);
329         }
330
331         if (obd->obd_observer) {
332                 /*
333                  * Tell the observer about the new target.
334                  */
335                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
336                                 OBD_NOTIFY_ACTIVE);
337                 if (rc) {
338                         obd_disconnect(mdc_exp);
339                         RETURN(rc);
340                 }
341         }
342
343         tgt->ltd_active = 1;
344         tgt->ltd_exp = mdc_exp;
345         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count++;
346
347         md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize);
348
349         rc = lu_qos_add_tgt(&lmv->lmv_qos, tgt);
350         if (rc) {
351                 obd_disconnect(mdc_exp);
352                 RETURN(rc);
353         }
354
355         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
356                mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
357                atomic_read(&obd->obd_refcount));
358
359         lmv_statfs_check_update(obd, tgt);
360
361         if (lmv->lmv_tgts_kobj)
362                 /* Even if we failed to create the link, that's fine */
363                 rc = sysfs_create_link(lmv->lmv_tgts_kobj,
364                                        &mdc_obd->obd_kset.kobj,
365                                        mdc_obd->obd_name);
366         RETURN(0);
367 }
368
369 static void lmv_del_target(struct lmv_obd *lmv, struct lu_tgt_desc *tgt)
370 {
371         LASSERT(tgt);
372         ltd_del_tgt(&lmv->lmv_mdt_descs, tgt);
373         OBD_FREE_PTR(tgt);
374 }
375
376 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
377                            __u32 index, int gen)
378 {
379         struct obd_device *mdc_obd;
380         struct lmv_obd *lmv = &obd->u.lmv;
381         struct lmv_tgt_desc *tgt;
382         struct lu_tgt_descs *ltd = &lmv->lmv_mdt_descs;
383         int rc = 0;
384
385         ENTRY;
386
387         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
388         mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
389                                         &obd->obd_uuid);
390         if (!mdc_obd) {
391                 CERROR("%s: Target %s not attached: rc = %d\n",
392                        obd->obd_name, uuidp->uuid, -EINVAL);
393                 RETURN(-EINVAL);
394         }
395
396         OBD_ALLOC_PTR(tgt);
397         if (!tgt)
398                 RETURN(-ENOMEM);
399
400         mutex_init(&tgt->ltd_fid_mutex);
401         tgt->ltd_index = index;
402         tgt->ltd_uuid = *uuidp;
403         tgt->ltd_active = 0;
404
405         mutex_lock(&ltd->ltd_mutex);
406         rc = ltd_add_tgt(ltd, tgt);
407         mutex_unlock(&ltd->ltd_mutex);
408
409         if (rc)
410                 GOTO(out_tgt, rc);
411
412         if (!lmv->connected)
413                 /* lmv_check_connect() will connect this target. */
414                 RETURN(0);
415
416         rc = lmv_connect_mdc(obd, tgt);
417         if (!rc) {
418                 int easize = sizeof(struct lmv_stripe_md) +
419                         lmv->lmv_mdt_count * sizeof(struct lu_fid);
420
421                 lmv_init_ea_size(obd->obd_self_export, easize, 0);
422         }
423
424         RETURN(rc);
425
426 out_tgt:
427         OBD_FREE_PTR(tgt);
428         return rc;
429 }
430
431 static int lmv_check_connect(struct obd_device *obd)
432 {
433         struct lmv_obd *lmv = &obd->u.lmv;
434         struct lmv_tgt_desc *tgt;
435         int easize;
436         int rc;
437
438         ENTRY;
439
440         if (lmv->connected)
441                 RETURN(0);
442
443         mutex_lock(&lmv->lmv_mdt_descs.ltd_mutex);
444         if (lmv->connected)
445                 GOTO(unlock, rc = 0);
446
447         if (!lmv->lmv_mdt_count) {
448                 CERROR("%s: no targets configured: rc = -EINVAL\n",
449                        obd->obd_name);
450                 GOTO(unlock, rc = -EINVAL);
451         }
452
453         if (!lmv_mdt0_inited(lmv)) {
454                 CERROR("%s: no target configured for index 0: rc = -EINVAL.\n",
455                        obd->obd_name);
456                 GOTO(unlock, rc = -EINVAL);
457         }
458
459         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
460                obd->obd_uuid.uuid, obd->obd_name);
461
462         lmv_foreach_tgt(lmv, tgt) {
463                 rc = lmv_connect_mdc(obd, tgt);
464                 if (rc)
465                         GOTO(out_disc, rc);
466         }
467
468         lmv->connected = 1;
469         easize = lmv_mds_md_size(lmv->lmv_mdt_count, LMV_MAGIC);
470         lmv_init_ea_size(obd->obd_self_export, easize, 0);
471         EXIT;
472 unlock:
473         mutex_unlock(&lmv->lmv_mdt_descs.ltd_mutex);
474
475         return rc;
476
477 out_disc:
478         lmv_foreach_tgt(lmv, tgt) {
479                 tgt->ltd_active = 0;
480                 if (!tgt->ltd_exp)
481                         continue;
482
483                 --lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count;
484                 obd_disconnect(tgt->ltd_exp);
485         }
486
487         goto unlock;
488 }
489
490 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
491 {
492         struct lmv_obd *lmv = &obd->u.lmv;
493         struct obd_device *mdc_obd;
494         int rc;
495         ENTRY;
496
497         LASSERT(tgt != NULL);
498         LASSERT(obd != NULL);
499
500         mdc_obd = class_exp2obd(tgt->ltd_exp);
501
502         if (mdc_obd) {
503                 mdc_obd->obd_force = obd->obd_force;
504                 mdc_obd->obd_fail = obd->obd_fail;
505                 mdc_obd->obd_no_recov = obd->obd_no_recov;
506
507                 if (lmv->lmv_tgts_kobj)
508                         sysfs_remove_link(lmv->lmv_tgts_kobj,
509                                           mdc_obd->obd_name);
510         }
511
512         rc = lu_qos_del_tgt(&lmv->lmv_qos, tgt);
513         if (rc)
514                 CERROR("%s: Can't del target from QoS table: rc = %d\n",
515                        tgt->ltd_exp->exp_obd->obd_name, rc);
516
517         rc = fld_client_del_target(&lmv->lmv_fld, tgt->ltd_index);
518         if (rc)
519                 CERROR("%s: Can't del fld targets: rc = %d\n",
520                        tgt->ltd_exp->exp_obd->obd_name, rc);
521
522         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
523         if (rc)
524                 CERROR("%s: Can't finalize fids factory: rc = %d\n",
525                        tgt->ltd_exp->exp_obd->obd_name, rc);
526
527         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
528                tgt->ltd_exp->exp_obd->obd_name,
529                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
530
531         lmv_activate_target(lmv, tgt, 0);
532         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
533         rc = obd_disconnect(tgt->ltd_exp);
534         if (rc) {
535                 CERROR("%s: Target %s disconnect error: rc = %d\n",
536                        tgt->ltd_exp->exp_obd->obd_name,
537                        tgt->ltd_uuid.uuid, rc);
538         }
539         tgt->ltd_exp = NULL;
540         RETURN(0);
541 }
542
543 static int lmv_disconnect(struct obd_export *exp)
544 {
545         struct obd_device *obd = class_exp2obd(exp);
546         struct lmv_obd *lmv = &obd->u.lmv;
547         struct lmv_tgt_desc *tgt;
548         int rc;
549
550         ENTRY;
551
552         lmv_foreach_connected_tgt(lmv, tgt)
553                 lmv_disconnect_mdc(obd, tgt);
554
555         if (lmv->lmv_tgts_kobj)
556                 kobject_put(lmv->lmv_tgts_kobj);
557
558         lmv->connected = 0;
559         rc = class_disconnect(exp);
560
561         RETURN(rc);
562 }
563
564 static int lmv_fid2path(struct obd_export *exp, int len, void *karg,
565                         void __user *uarg)
566 {
567         struct obd_device *obd = class_exp2obd(exp);
568         struct lmv_obd *lmv = &obd->u.lmv;
569         struct getinfo_fid2path *gf;
570         struct lmv_tgt_desc *tgt;
571         struct getinfo_fid2path *remote_gf = NULL;
572         struct lu_fid root_fid;
573         int remote_gf_size = 0;
574         int rc;
575
576         gf = karg;
577         tgt = lmv_fid2tgt(lmv, &gf->gf_fid);
578         if (IS_ERR(tgt))
579                 RETURN(PTR_ERR(tgt));
580
581         root_fid = *gf->gf_u.gf_root_fid;
582         LASSERT(fid_is_sane(&root_fid));
583
584 repeat_fid2path:
585         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
586         if (rc != 0 && rc != -EREMOTE)
587                 GOTO(out_fid2path, rc);
588
589         /* If remote_gf != NULL, it means just building the
590          * path on the remote MDT, copy this path segement to gf */
591         if (remote_gf != NULL) {
592                 struct getinfo_fid2path *ori_gf;
593                 char *ptr;
594                 int len;
595
596                 ori_gf = (struct getinfo_fid2path *)karg;
597                 if (strlen(ori_gf->gf_u.gf_path) + 1 +
598                     strlen(gf->gf_u.gf_path) + 1 > ori_gf->gf_pathlen)
599                         GOTO(out_fid2path, rc = -EOVERFLOW);
600
601                 ptr = ori_gf->gf_u.gf_path;
602
603                 len = strlen(gf->gf_u.gf_path);
604                 /* move the current path to the right to release space
605                  * for closer-to-root part */
606                 memmove(ptr + len + 1, ptr, strlen(ori_gf->gf_u.gf_path));
607                 memcpy(ptr, gf->gf_u.gf_path, len);
608                 ptr[len] = '/';
609         }
610
611         CDEBUG(D_INFO, "%s: get path %s "DFID" rec: %llu ln: %u\n",
612                tgt->ltd_exp->exp_obd->obd_name,
613                gf->gf_u.gf_path, PFID(&gf->gf_fid), gf->gf_recno,
614                gf->gf_linkno);
615
616         if (rc == 0)
617                 GOTO(out_fid2path, rc);
618
619         /* sigh, has to go to another MDT to do path building further */
620         if (remote_gf == NULL) {
621                 remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
622                 OBD_ALLOC(remote_gf, remote_gf_size);
623                 if (remote_gf == NULL)
624                         GOTO(out_fid2path, rc = -ENOMEM);
625                 remote_gf->gf_pathlen = PATH_MAX;
626         }
627
628         if (!fid_is_sane(&gf->gf_fid)) {
629                 CERROR("%s: invalid FID "DFID": rc = %d\n",
630                        tgt->ltd_exp->exp_obd->obd_name,
631                        PFID(&gf->gf_fid), -EINVAL);
632                 GOTO(out_fid2path, rc = -EINVAL);
633         }
634
635         tgt = lmv_fid2tgt(lmv, &gf->gf_fid);
636         if (IS_ERR(tgt))
637                 GOTO(out_fid2path, rc = -EINVAL);
638
639         remote_gf->gf_fid = gf->gf_fid;
640         remote_gf->gf_recno = -1;
641         remote_gf->gf_linkno = -1;
642         memset(remote_gf->gf_u.gf_path, 0, remote_gf->gf_pathlen);
643         *remote_gf->gf_u.gf_root_fid = root_fid;
644         gf = remote_gf;
645         goto repeat_fid2path;
646
647 out_fid2path:
648         if (remote_gf != NULL)
649                 OBD_FREE(remote_gf, remote_gf_size);
650         RETURN(rc);
651 }
652
653 static int lmv_hsm_req_count(struct lmv_obd *lmv,
654                              const struct hsm_user_request *hur,
655                              const struct lmv_tgt_desc *tgt_mds)
656 {
657         struct lmv_tgt_desc *curr_tgt;
658         __u32 i;
659         int nr = 0;
660
661         /* count how many requests must be sent to the given target */
662         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
663                 curr_tgt = lmv_fid2tgt(lmv, &hur->hur_user_item[i].hui_fid);
664                 if (IS_ERR(curr_tgt))
665                         RETURN(PTR_ERR(curr_tgt));
666                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
667                         nr++;
668         }
669         return nr;
670 }
671
672 static int lmv_hsm_req_build(struct lmv_obd *lmv,
673                               struct hsm_user_request *hur_in,
674                               const struct lmv_tgt_desc *tgt_mds,
675                               struct hsm_user_request *hur_out)
676 {
677         __u32 i, nr_out;
678         struct lmv_tgt_desc *curr_tgt;
679
680         /* build the hsm_user_request for the given target */
681         hur_out->hur_request = hur_in->hur_request;
682         nr_out = 0;
683         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
684                 curr_tgt = lmv_fid2tgt(lmv, &hur_in->hur_user_item[i].hui_fid);
685                 if (IS_ERR(curr_tgt))
686                         RETURN(PTR_ERR(curr_tgt));
687                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
688                         hur_out->hur_user_item[nr_out] =
689                                                 hur_in->hur_user_item[i];
690                         nr_out++;
691                 }
692         }
693         hur_out->hur_request.hr_itemcount = nr_out;
694         memcpy(hur_data(hur_out), hur_data(hur_in),
695                hur_in->hur_request.hr_data_len);
696
697         RETURN(0);
698 }
699
700 static int lmv_hsm_ct_unregister(struct obd_device *obd, unsigned int cmd,
701                                  int len, struct lustre_kernelcomm *lk,
702                                  void __user *uarg)
703 {
704         struct lmv_obd *lmv = &obd->u.lmv;
705         struct lu_tgt_desc *tgt;
706         int rc;
707
708         ENTRY;
709
710         /* unregister request (call from llapi_hsm_copytool_fini) */
711         lmv_foreach_connected_tgt(lmv, tgt)
712                 /* best effort: try to clean as much as possible
713                  * (continue on error) */
714                 obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
715
716         /* Whatever the result, remove copytool from kuc groups.
717          * Unreached coordinators will get EPIPE on next requests
718          * and will unregister automatically.
719          */
720         rc = libcfs_kkuc_group_rem(&obd->obd_uuid, lk->lk_uid, lk->lk_group);
721
722         RETURN(rc);
723 }
724
725 static int lmv_hsm_ct_register(struct obd_device *obd, unsigned int cmd,
726                                int len, struct lustre_kernelcomm *lk,
727                                void __user *uarg)
728 {
729         struct lmv_obd *lmv = &obd->u.lmv;
730         struct file *filp;
731         bool any_set = false;
732         struct kkuc_ct_data *kcd;
733         size_t kcd_size;
734         struct lu_tgt_desc *tgt;
735         __u32 i;
736         int err;
737         int rc = 0;
738
739         ENTRY;
740
741         filp = fget(lk->lk_wfd);
742         if (!filp)
743                 RETURN(-EBADF);
744
745         if (lk->lk_flags & LK_FLG_DATANR)
746                 kcd_size = offsetof(struct kkuc_ct_data,
747                                     kcd_archives[lk->lk_data_count]);
748         else
749                 kcd_size = sizeof(*kcd);
750
751         OBD_ALLOC(kcd, kcd_size);
752         if (kcd == NULL)
753                 GOTO(err_fput, rc = -ENOMEM);
754
755         kcd->kcd_nr_archives = lk->lk_data_count;
756         if (lk->lk_flags & LK_FLG_DATANR) {
757                 kcd->kcd_magic = KKUC_CT_DATA_ARRAY_MAGIC;
758                 if (lk->lk_data_count > 0)
759                         memcpy(kcd->kcd_archives, lk->lk_data,
760                                sizeof(*kcd->kcd_archives) * lk->lk_data_count);
761         } else {
762                 kcd->kcd_magic = KKUC_CT_DATA_BITMAP_MAGIC;
763         }
764
765         rc = libcfs_kkuc_group_add(filp, &obd->obd_uuid, lk->lk_uid,
766                                    lk->lk_group, kcd, kcd_size);
767         OBD_FREE(kcd, kcd_size);
768         if (rc)
769                 GOTO(err_fput, rc);
770
771         /* All or nothing: try to register to all MDS.
772          * In case of failure, unregister from previous MDS,
773          * except if it because of inactive target. */
774         lmv_foreach_connected_tgt(lmv, tgt) {
775                 err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
776                 if (err) {
777                         if (tgt->ltd_active) {
778                                 /* permanent error */
779                                 CERROR("%s: iocontrol MDC %s on MDT"
780                                        " idx %d cmd %x: err = %d\n",
781                                        lmv2obd_dev(lmv)->obd_name,
782                                        tgt->ltd_uuid.uuid, tgt->ltd_index, cmd,
783                                        err);
784                                 rc = err;
785                                 lk->lk_flags |= LK_FLG_STOP;
786                                 i = tgt->ltd_index;
787                                 /* unregister from previous MDS */
788                                 lmv_foreach_connected_tgt(lmv, tgt) {
789                                         if (tgt->ltd_index >= i)
790                                                 break;
791
792                                         obd_iocontrol(cmd, tgt->ltd_exp, len,
793                                                       lk, uarg);
794                                 }
795                                 GOTO(err_kkuc_rem, rc);
796                         }
797                         /* else: transient error.
798                          * kuc will register to the missing MDT
799                          * when it is back */
800                 } else {
801                         any_set = true;
802                 }
803         }
804
805         if (!any_set)
806                 /* no registration done: return error */
807                 GOTO(err_kkuc_rem, rc = -ENOTCONN);
808
809         RETURN(0);
810
811 err_kkuc_rem:
812         libcfs_kkuc_group_rem(&obd->obd_uuid, lk->lk_uid, lk->lk_group);
813
814 err_fput:
815         fput(filp);
816         return rc;
817 }
818
819 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
820                          int len, void *karg, void __user *uarg)
821 {
822         struct obd_device *obd = class_exp2obd(exp);
823         struct lmv_obd *lmv = &obd->u.lmv;
824         struct lu_tgt_desc *tgt = NULL;
825         int set = 0;
826         __u32 count = lmv->lmv_mdt_count;
827         int rc = 0;
828
829         ENTRY;
830
831         if (count == 0)
832                 RETURN(-ENOTTY);
833
834         switch (cmd) {
835         case IOC_OBD_STATFS: {
836                 struct obd_ioctl_data *data = karg;
837                 struct obd_device *mdc_obd;
838                 struct obd_statfs stat_buf = {0};
839                 __u32 index;
840
841                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
842
843                 if (index >= lmv->lmv_mdt_descs.ltd_tgts_size)
844                         RETURN(-ENODEV);
845
846                 tgt = lmv_tgt(lmv, index);
847                 if (!tgt)
848                         RETURN(-EAGAIN);
849
850                 if (!tgt->ltd_active)
851                         RETURN(-ENODATA);
852
853                 mdc_obd = class_exp2obd(tgt->ltd_exp);
854                 if (!mdc_obd)
855                         RETURN(-EINVAL);
856
857                 /* copy UUID */
858                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
859                                  min((int) data->ioc_plen2,
860                                      (int) sizeof(struct obd_uuid))))
861                         RETURN(-EFAULT);
862
863                 rc = obd_statfs(NULL, tgt->ltd_exp, &stat_buf,
864                                 ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS,
865                                 0);
866                 if (rc)
867                         RETURN(rc);
868                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
869                                  min((int) data->ioc_plen1,
870                                      (int) sizeof(stat_buf))))
871                         RETURN(-EFAULT);
872                 break;
873         }
874         case OBD_IOC_QUOTACTL: {
875                 struct if_quotactl *qctl = karg;
876                 struct obd_quotactl *oqctl;
877                 struct obd_import *imp;
878
879                 if (qctl->qc_valid == QC_MDTIDX) {
880                         tgt = lmv_tgt(lmv, qctl->qc_idx);
881                 } else if (qctl->qc_valid == QC_UUID) {
882                         lmv_foreach_tgt(lmv, tgt) {
883                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
884                                                      &qctl->obd_uuid))
885                                         continue;
886
887                                 if (!tgt->ltd_exp)
888                                         RETURN(-EINVAL);
889
890                                 break;
891                         }
892                 } else {
893                         RETURN(-EINVAL);
894                 }
895
896                 if (!tgt)
897                         RETURN(-ENODEV);
898
899                 if (!tgt->ltd_exp)
900                         RETURN(-EINVAL);
901
902                 imp = class_exp2cliimp(tgt->ltd_exp);
903                 if (!tgt->ltd_active && imp->imp_state != LUSTRE_IMP_IDLE) {
904                         qctl->qc_valid = QC_MDTIDX;
905                         qctl->obd_uuid = tgt->ltd_uuid;
906                         RETURN(-ENODATA);
907                 }
908
909                 OBD_ALLOC_PTR(oqctl);
910                 if (!oqctl)
911                         RETURN(-ENOMEM);
912
913                 QCTL_COPY(oqctl, qctl);
914                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
915                 if (rc == 0) {
916                         QCTL_COPY(qctl, oqctl);
917                         qctl->qc_valid = QC_MDTIDX;
918                         qctl->obd_uuid = tgt->ltd_uuid;
919                 }
920                 OBD_FREE_PTR(oqctl);
921                 break;
922         }
923         case LL_IOC_GET_CONNECT_FLAGS: {
924                 tgt = lmv_tgt(lmv, 0);
925                 rc = -ENODATA;
926                 if (tgt && tgt->ltd_exp)
927                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
928                 break;
929         }
930         case LL_IOC_FID2MDTIDX: {
931                 struct lu_fid *fid = karg;
932                 int             mdt_index;
933
934                 rc = lmv_fld_lookup(lmv, fid, &mdt_index);
935                 if (rc != 0)
936                         RETURN(rc);
937
938                 /* Note: this is from llite(see ll_dir_ioctl()), @uarg does not
939                  * point to user space memory for FID2MDTIDX. */
940                 *(__u32 *)uarg = mdt_index;
941                 break;
942         }
943         case OBD_IOC_FID2PATH: {
944                 rc = lmv_fid2path(exp, len, karg, uarg);
945                 break;
946         }
947         case LL_IOC_HSM_STATE_GET:
948         case LL_IOC_HSM_STATE_SET:
949         case LL_IOC_HSM_ACTION: {
950                 struct md_op_data *op_data = karg;
951
952                 tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
953                 if (IS_ERR(tgt))
954                         RETURN(PTR_ERR(tgt));
955
956                 if (tgt->ltd_exp == NULL)
957                         RETURN(-EINVAL);
958
959                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
960                 break;
961         }
962         case LL_IOC_HSM_PROGRESS: {
963                 const struct hsm_progress_kernel *hpk = karg;
964
965                 tgt = lmv_fid2tgt(lmv, &hpk->hpk_fid);
966                 if (IS_ERR(tgt))
967                         RETURN(PTR_ERR(tgt));
968                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
969                 break;
970         }
971         case LL_IOC_HSM_REQUEST: {
972                 struct hsm_user_request *hur = karg;
973                 unsigned int reqcount = hur->hur_request.hr_itemcount;
974
975                 if (reqcount == 0)
976                         RETURN(0);
977
978                 /* if the request is about a single fid
979                  * or if there is a single MDS, no need to split
980                  * the request. */
981                 if (reqcount == 1 || count == 1) {
982                         tgt = lmv_fid2tgt(lmv, &hur->hur_user_item[0].hui_fid);
983                         if (IS_ERR(tgt))
984                                 RETURN(PTR_ERR(tgt));
985                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
986                 } else {
987                         /* split fid list to their respective MDS */
988                         lmv_foreach_connected_tgt(lmv, tgt) {
989                                 int nr, rc1;
990                                 size_t reqlen;
991                                 struct hsm_user_request *req;
992
993                                 nr = lmv_hsm_req_count(lmv, hur, tgt);
994                                 if (nr < 0)
995                                         RETURN(nr);
996                                 if (nr == 0) /* nothing for this MDS */
997                                         continue;
998
999                                 /* build a request with fids for this MDS */
1000                                 reqlen = offsetof(typeof(*hur),
1001                                                   hur_user_item[nr])
1002                                                 + hur->hur_request.hr_data_len;
1003                                 OBD_ALLOC_LARGE(req, reqlen);
1004                                 if (req == NULL)
1005                                         RETURN(-ENOMEM);
1006                                 rc1 = lmv_hsm_req_build(lmv, hur, tgt, req);
1007                                 if (rc1 < 0)
1008                                         GOTO(hsm_req_err, rc1);
1009                                 rc1 = obd_iocontrol(cmd, tgt->ltd_exp, reqlen,
1010                                                     req, uarg);
1011 hsm_req_err:
1012                                 if (rc1 != 0 && rc == 0)
1013                                         rc = rc1;
1014                                 OBD_FREE_LARGE(req, reqlen);
1015                         }
1016                 }
1017                 break;
1018         }
1019         case LL_IOC_LOV_SWAP_LAYOUTS: {
1020                 struct md_op_data *op_data = karg;
1021                 struct lmv_tgt_desc *tgt1, *tgt2;
1022
1023                 tgt1 = lmv_fid2tgt(lmv, &op_data->op_fid1);
1024                 if (IS_ERR(tgt1))
1025                         RETURN(PTR_ERR(tgt1));
1026
1027                 tgt2 = lmv_fid2tgt(lmv, &op_data->op_fid2);
1028                 if (IS_ERR(tgt2))
1029                         RETURN(PTR_ERR(tgt2));
1030
1031                 if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1032                         RETURN(-EINVAL);
1033
1034                 /* only files on same MDT can have their layouts swapped */
1035                 if (tgt1->ltd_index != tgt2->ltd_index)
1036                         RETURN(-EPERM);
1037
1038                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1039                 break;
1040         }
1041         case LL_IOC_HSM_CT_START: {
1042                 struct lustre_kernelcomm *lk = karg;
1043                 if (lk->lk_flags & LK_FLG_STOP)
1044                         rc = lmv_hsm_ct_unregister(obd, cmd, len, lk, uarg);
1045                 else
1046                         rc = lmv_hsm_ct_register(obd, cmd, len, lk, uarg);
1047                 break;
1048         }
1049         default:
1050                 lmv_foreach_connected_tgt(lmv, tgt) {
1051                         struct obd_device *mdc_obd;
1052                         int err;
1053
1054                         /* ll_umount_begin() sets force flag but for lmv, not
1055                          * mdc. Let's pass it through */
1056                         mdc_obd = class_exp2obd(tgt->ltd_exp);
1057                         mdc_obd->obd_force = obd->obd_force;
1058                         err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1059                         if (err) {
1060                                 if (tgt->ltd_active) {
1061                                         CERROR("error: iocontrol MDC %s on MDT"
1062                                                " idx %d cmd %x: err = %d\n",
1063                                                tgt->ltd_uuid.uuid,
1064                                                tgt->ltd_index, cmd, err);
1065                                         if (!rc)
1066                                                 rc = err;
1067                                 }
1068                         } else
1069                                 set = 1;
1070                 }
1071                 if (!set && !rc)
1072                         rc = -EIO;
1073         }
1074         RETURN(rc);
1075 }
1076
1077 int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp,
1078                   struct lu_fid *fid, struct md_op_data *op_data)
1079 {
1080         struct obd_device *obd = class_exp2obd(exp);
1081         struct lmv_obd *lmv = &obd->u.lmv;
1082         struct lmv_tgt_desc *tgt;
1083         int rc;
1084
1085         ENTRY;
1086
1087         LASSERT(op_data);
1088         LASSERT(fid);
1089
1090         tgt = lmv_tgt(lmv, op_data->op_mds);
1091         if (!tgt)
1092                 RETURN(-ENODEV);
1093
1094         if (!tgt->ltd_active || !tgt->ltd_exp)
1095                 RETURN(-ENODEV);
1096
1097         /*
1098          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1099          * on server that seq in new allocated fid is not yet known.
1100          */
1101         mutex_lock(&tgt->ltd_fid_mutex);
1102         rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL);
1103         mutex_unlock(&tgt->ltd_fid_mutex);
1104         if (rc > 0) {
1105                 LASSERT(fid_is_sane(fid));
1106                 rc = 0;
1107         }
1108
1109         RETURN(rc);
1110 }
1111
1112 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1113 {
1114         struct lmv_obd *lmv = &obd->u.lmv;
1115         struct lmv_desc *desc;
1116         struct lnet_processid lnet_id;
1117         int i = 0;
1118         int rc;
1119
1120         ENTRY;
1121
1122         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1123                 CERROR("LMV setup requires a descriptor\n");
1124                 RETURN(-EINVAL);
1125         }
1126
1127         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1128         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1129                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1130                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1131                 RETURN(-EINVAL);
1132         }
1133
1134         obd_str2uuid(&lmv->lmv_mdt_descs.ltd_lmv_desc.ld_uuid,
1135                      desc->ld_uuid.uuid);
1136         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_tgt_count = 0;
1137         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count = 0;
1138         lmv->lmv_mdt_descs.ltd_lmv_desc.ld_qos_maxage =
1139                 LMV_DESC_QOS_MAXAGE_DEFAULT;
1140         lmv->max_def_easize = 0;
1141         lmv->max_easize = 0;
1142
1143         spin_lock_init(&lmv->lmv_lock);
1144
1145         /*
1146          * initialize rr_index to lower 32bit of netid, so that client
1147          * can distribute subdirs evenly from the beginning.
1148          */
1149         while (LNetGetId(i++, &lnet_id) != -ENOENT) {
1150                 if (!nid_is_lo0(&lnet_id.nid)) {
1151                         lmv->lmv_qos_rr_index = ntohl(lnet_id.nid.nid_addr[0]);
1152                         break;
1153                 }
1154         }
1155
1156         rc = lmv_tunables_init(obd);
1157         if (rc)
1158                 CWARN("%s: error adding LMV sysfs/debugfs files: rc = %d\n",
1159                       obd->obd_name, rc);
1160
1161         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1162                              LUSTRE_CLI_FLD_HASH_DHT);
1163         if (rc)
1164                 CERROR("Can't init FLD, err %d\n", rc);
1165
1166         rc = lu_tgt_descs_init(&lmv->lmv_mdt_descs, true);
1167         if (rc)
1168                 CWARN("%s: error initialize target table: rc = %d\n",
1169                       obd->obd_name, rc);
1170
1171         RETURN(rc);
1172 }
1173
1174 static int lmv_cleanup(struct obd_device *obd)
1175 {
1176         struct lmv_obd *lmv = &obd->u.lmv;
1177         struct lu_tgt_desc *tgt;
1178         struct lu_tgt_desc *tmp;
1179
1180         ENTRY;
1181
1182         fld_client_fini(&lmv->lmv_fld);
1183         fld_client_debugfs_fini(&lmv->lmv_fld);
1184
1185         lprocfs_obd_cleanup(obd);
1186         lprocfs_free_md_stats(obd);
1187
1188         lmv_foreach_tgt_safe(lmv, tgt, tmp)
1189                 lmv_del_target(lmv, tgt);
1190         lu_tgt_descs_fini(&lmv->lmv_mdt_descs);
1191
1192         RETURN(0);
1193 }
1194
1195 static int lmv_process_config(struct obd_device *obd, size_t len, void *buf)
1196 {
1197         struct lustre_cfg       *lcfg = buf;
1198         struct obd_uuid         obd_uuid;
1199         int                     gen;
1200         __u32                   index;
1201         int                     rc;
1202         ENTRY;
1203
1204         switch (lcfg->lcfg_command) {
1205         case LCFG_ADD_MDC:
1206                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1207                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1208                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
1209                         GOTO(out, rc = -EINVAL);
1210
1211                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1212
1213                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%u", &index) != 1)
1214                         GOTO(out, rc = -EINVAL);
1215                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
1216                         GOTO(out, rc = -EINVAL);
1217                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1218                 GOTO(out, rc);
1219         default:
1220                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1221                 GOTO(out, rc = -EINVAL);
1222         }
1223 out:
1224         RETURN(rc);
1225 }
1226
1227 static int lmv_select_statfs_mdt(struct lmv_obd *lmv, __u32 flags)
1228 {
1229         int i;
1230
1231         if (flags & OBD_STATFS_FOR_MDT0)
1232                 return 0;
1233
1234         if (lmv->lmv_statfs_start || lmv->lmv_mdt_count == 1)
1235                 return lmv->lmv_statfs_start;
1236
1237         /* choose initial MDT for this client */
1238         for (i = 0;; i++) {
1239                 struct lnet_processid lnet_id;
1240                 if (LNetGetId(i, &lnet_id) == -ENOENT)
1241                         break;
1242
1243                 if (!nid_is_lo0(&lnet_id.nid)) {
1244                         /* We dont need a full 64-bit modulus, just enough
1245                          * to distribute the requests across MDTs evenly.
1246                          */
1247                         lmv->lmv_statfs_start = nidhash(&lnet_id.nid) %
1248                                                 lmv->lmv_mdt_count;
1249                         break;
1250                 }
1251         }
1252
1253         return lmv->lmv_statfs_start;
1254 }
1255
1256 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1257                       struct obd_statfs *osfs, time64_t max_age, __u32 flags)
1258 {
1259         struct obd_device *obd = class_exp2obd(exp);
1260         struct lmv_obd *lmv = &obd->u.lmv;
1261         struct obd_statfs *temp;
1262         struct lu_tgt_desc *tgt;
1263         __u32 i;
1264         __u32 idx;
1265         int rc = 0;
1266         int err = 0;
1267
1268         ENTRY;
1269
1270         OBD_ALLOC(temp, sizeof(*temp));
1271         if (temp == NULL)
1272                 RETURN(-ENOMEM);
1273
1274         /* distribute statfs among MDTs */
1275         idx = lmv_select_statfs_mdt(lmv, flags);
1276
1277         for (i = 0; i < lmv->lmv_mdt_descs.ltd_tgts_size; i++, idx++) {
1278                 idx = idx % lmv->lmv_mdt_descs.ltd_tgts_size;
1279                 tgt = lmv_tgt(lmv, idx);
1280                 if (!tgt || !tgt->ltd_exp)
1281                         continue;
1282
1283                 rc = obd_statfs(env, tgt->ltd_exp, temp, max_age,
1284                                 flags | OBD_STATFS_NESTED);
1285                 if (rc) {
1286                         CERROR("%s: can't stat MDS #%d: rc = %d\n",
1287                                tgt->ltd_exp->exp_obd->obd_name, i, rc);
1288                         err = rc;
1289                         /* Try another MDT */
1290                         if (flags & OBD_STATFS_SUM)
1291                                 continue;
1292                         GOTO(out_free_temp, rc);
1293                 }
1294
1295                 if (temp->os_state & OS_STATFS_SUM ||
1296                     flags == OBD_STATFS_FOR_MDT0) {
1297                         /* reset to the last aggregated values
1298                          * and don't sum with non-aggrated data */
1299                         /* If the statfs is from mount, it needs to retrieve
1300                          * necessary information from MDT0. i.e. mount does
1301                          * not need the merged osfs from all of MDT. Also
1302                          * clients can be mounted as long as MDT0 is in
1303                          * service */
1304                         *osfs = *temp;
1305                         GOTO(out_free_temp, rc);
1306                 }
1307
1308                 if (i == 0) {
1309                         *osfs = *temp;
1310                 } else {
1311                         osfs->os_bavail += temp->os_bavail;
1312                         osfs->os_blocks += temp->os_blocks;
1313                         osfs->os_ffree += temp->os_ffree;
1314                         osfs->os_files += temp->os_files;
1315                         osfs->os_granted += temp->os_granted;
1316                 }
1317         }
1318         /* There is no stats from some MDTs, data incomplete */
1319         if (err)
1320                 rc = err;
1321 out_free_temp:
1322         OBD_FREE(temp, sizeof(*temp));
1323         RETURN(rc);
1324 }
1325
1326 static int lmv_statfs_update(void *cookie, int rc)
1327 {
1328         struct obd_info *oinfo = cookie;
1329         struct obd_device *obd = oinfo->oi_obd;
1330         struct lmv_obd *lmv = &obd->u.lmv;
1331         struct lmv_tgt_desc *tgt = oinfo->oi_tgt;
1332         struct obd_statfs *osfs = oinfo->oi_osfs;
1333
1334         /*
1335          * NB: don't deactivate TGT upon error, because we may not trigger async
1336          * statfs any longer, then there is no chance to activate TGT.
1337          */
1338         if (!rc) {
1339                 spin_lock(&lmv->lmv_lock);
1340                 tgt->ltd_statfs = *osfs;
1341                 tgt->ltd_statfs_age = ktime_get_seconds();
1342                 spin_unlock(&lmv->lmv_lock);
1343                 set_bit(LQ_DIRTY, &lmv->lmv_qos.lq_flags);
1344         }
1345
1346         return rc;
1347 }
1348
1349 /* update tgt statfs async if it's ld_qos_maxage old */
1350 int lmv_statfs_check_update(struct obd_device *obd, struct lmv_tgt_desc *tgt)
1351 {
1352         struct obd_info oinfo = {
1353                 .oi_obd = obd,
1354                 .oi_tgt = tgt,
1355                 .oi_cb_up = lmv_statfs_update,
1356         };
1357         int rc;
1358
1359         if (ktime_get_seconds() - tgt->ltd_statfs_age <
1360             obd->u.lmv.lmv_mdt_descs.ltd_lmv_desc.ld_qos_maxage)
1361                 return 0;
1362
1363         rc = obd_statfs_async(tgt->ltd_exp, &oinfo, 0, NULL);
1364
1365         return rc;
1366 }
1367
1368 static int lmv_get_root(struct obd_export *exp, const char *fileset,
1369                         struct lu_fid *fid)
1370 {
1371         struct obd_device *obd = exp->exp_obd;
1372         struct lmv_obd *lmv = &obd->u.lmv;
1373         struct lu_tgt_desc *tgt = lmv_tgt(lmv, 0);
1374         int rc;
1375
1376         ENTRY;
1377
1378         if (!tgt)
1379                 RETURN(-ENODEV);
1380
1381         rc = md_get_root(tgt->ltd_exp, fileset, fid);
1382         RETURN(rc);
1383 }
1384
1385 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1386                         u64 obd_md_valid, const char *name, size_t buf_size,
1387                         struct ptlrpc_request **req)
1388 {
1389         struct obd_device *obd = exp->exp_obd;
1390         struct lmv_obd *lmv = &obd->u.lmv;
1391         struct lmv_tgt_desc *tgt;
1392         int rc;
1393
1394         ENTRY;
1395
1396         tgt = lmv_fid2tgt(lmv, fid);
1397         if (IS_ERR(tgt))
1398                 RETURN(PTR_ERR(tgt));
1399
1400         rc = md_getxattr(tgt->ltd_exp, fid, obd_md_valid, name, buf_size, req);
1401
1402         RETURN(rc);
1403 }
1404
1405 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1406                         u64 obd_md_valid, const char *name,
1407                         const void *value, size_t value_size,
1408                         unsigned int xattr_flags, u32 suppgid,
1409                         struct ptlrpc_request **req)
1410 {
1411         struct obd_device *obd = exp->exp_obd;
1412         struct lmv_obd *lmv = &obd->u.lmv;
1413         struct lmv_tgt_desc *tgt;
1414         int rc;
1415
1416         ENTRY;
1417
1418         tgt = lmv_fid2tgt(lmv, fid);
1419         if (IS_ERR(tgt))
1420                 RETURN(PTR_ERR(tgt));
1421
1422         rc = md_setxattr(tgt->ltd_exp, fid, obd_md_valid, name,
1423                          value, value_size, xattr_flags, suppgid, req);
1424
1425         RETURN(rc);
1426 }
1427
1428 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1429                        struct ptlrpc_request **request)
1430 {
1431         struct obd_device *obd = exp->exp_obd;
1432         struct lmv_obd *lmv = &obd->u.lmv;
1433         struct lmv_tgt_desc *tgt;
1434         int rc;
1435
1436         ENTRY;
1437
1438         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
1439         if (IS_ERR(tgt))
1440                 RETURN(PTR_ERR(tgt));
1441
1442         if (op_data->op_flags & MF_GET_MDT_IDX) {
1443                 op_data->op_mds = tgt->ltd_index;
1444                 RETURN(0);
1445         }
1446
1447         rc = md_getattr(tgt->ltd_exp, op_data, request);
1448
1449         RETURN(rc);
1450 }
1451
1452 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1453 {
1454         struct obd_device *obd = exp->exp_obd;
1455         struct lmv_obd *lmv = &obd->u.lmv;
1456         struct lu_tgt_desc *tgt;
1457
1458         ENTRY;
1459
1460         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1461
1462         /*
1463          * With DNE every object can have two locks in different namespaces:
1464          * lookup lock in space of MDT storing direntry and update/open lock in
1465          * space of MDT storing inode.
1466          */
1467         lmv_foreach_connected_tgt(lmv, tgt)
1468                 md_null_inode(tgt->ltd_exp, fid);
1469
1470         RETURN(0);
1471 }
1472
1473 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1474                      struct md_open_data *mod, struct ptlrpc_request **request)
1475 {
1476         struct obd_device *obd = exp->exp_obd;
1477         struct lmv_obd *lmv = &obd->u.lmv;
1478         struct lmv_tgt_desc *tgt;
1479         int rc;
1480
1481         ENTRY;
1482
1483         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
1484         if (IS_ERR(tgt))
1485                 RETURN(PTR_ERR(tgt));
1486
1487         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1488         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1489         RETURN(rc);
1490 }
1491
1492 static struct lu_tgt_desc *lmv_locate_tgt_qos(struct lmv_obd *lmv,
1493                                               struct md_op_data *op_data)
1494 {
1495         struct lu_tgt_desc *tgt, *cur = NULL;
1496         __u64 total_avail = 0;
1497         __u64 total_weight = 0;
1498         __u64 cur_weight = 0;
1499         int total_usable = 0;
1500         __u64 rand;
1501         int rc;
1502
1503         ENTRY;
1504
1505         if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs))
1506                 RETURN(ERR_PTR(-EAGAIN));
1507
1508         down_write(&lmv->lmv_qos.lq_rw_sem);
1509
1510         if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs))
1511                 GOTO(unlock, tgt = ERR_PTR(-EAGAIN));
1512
1513         rc = ltd_qos_penalties_calc(&lmv->lmv_mdt_descs);
1514         if (rc)
1515                 GOTO(unlock, tgt = ERR_PTR(rc));
1516
1517         lmv_foreach_tgt(lmv, tgt) {
1518                 if (!tgt->ltd_exp || !tgt->ltd_active) {
1519                         tgt->ltd_qos.ltq_usable = 0;
1520                         continue;
1521                 }
1522
1523                 tgt->ltd_qos.ltq_usable = 1;
1524                 lu_tgt_qos_weight_calc(tgt);
1525                 if (tgt->ltd_index == op_data->op_mds)
1526                         cur = tgt;
1527                 total_avail += tgt->ltd_qos.ltq_avail;
1528                 total_weight += tgt->ltd_qos.ltq_weight;
1529                 total_usable++;
1530         }
1531
1532         /* If current MDT has above-average space and dir is not aleady using
1533          * round-robin to spread across more MDTs, stay on the parent MDT
1534          * to avoid creating needless remote MDT directories.  Remote dirs
1535          * close to the root balance space more effectively than bottom dirs,
1536          * so prefer to create remote dirs at top level of directory tree.
1537          * "16 / (dir_depth + 10)" is the factor to make it less likely
1538          * for top-level directories to stay local unless they have more than
1539          * average free space, while deep dirs prefer local until more full.
1540          *    depth=0 -> 160%, depth=3 -> 123%, depth=6 -> 100%,
1541          *    depth=9 -> 84%, depth=12 -> 73%, depth=15 -> 64%
1542          */
1543         if (!lmv_op_default_rr_mkdir(op_data)) {
1544                 rand = total_avail * 16 /
1545                         (total_usable * (op_data->op_dir_depth + 10));
1546                 if (cur && cur->ltd_qos.ltq_avail >= rand) {
1547                         tgt = cur;
1548                         GOTO(unlock, tgt);
1549                 }
1550         }
1551
1552         rand = lu_prandom_u64_max(total_weight);
1553
1554         lmv_foreach_connected_tgt(lmv, tgt) {
1555                 if (!tgt->ltd_qos.ltq_usable)
1556                         continue;
1557
1558                 cur_weight += tgt->ltd_qos.ltq_weight;
1559                 if (cur_weight < rand)
1560                         continue;
1561
1562                 ltd_qos_update(&lmv->lmv_mdt_descs, tgt, &total_weight);
1563                 GOTO(unlock, tgt);
1564         }
1565
1566         /* no proper target found */
1567         GOTO(unlock, tgt = ERR_PTR(-EAGAIN));
1568 unlock:
1569         up_write(&lmv->lmv_qos.lq_rw_sem);
1570
1571         return tgt;
1572 }
1573
1574 static struct lu_tgt_desc *lmv_locate_tgt_rr(struct lmv_obd *lmv)
1575 {
1576         struct lu_tgt_desc *tgt;
1577         int i;
1578         int index;
1579
1580         ENTRY;
1581
1582         spin_lock(&lmv->lmv_lock);
1583         for (i = 0; i < lmv->lmv_mdt_descs.ltd_tgts_size; i++) {
1584                 index = (i + lmv->lmv_qos_rr_index) %
1585                         lmv->lmv_mdt_descs.ltd_tgts_size;
1586                 tgt = lmv_tgt(lmv, index);
1587                 if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
1588                         continue;
1589
1590                 lmv->lmv_qos_rr_index = (tgt->ltd_index + 1) %
1591                                         lmv->lmv_mdt_descs.ltd_tgts_size;
1592                 spin_unlock(&lmv->lmv_lock);
1593
1594                 RETURN(tgt);
1595         }
1596         spin_unlock(&lmv->lmv_lock);
1597
1598         RETURN(ERR_PTR(-ENODEV));
1599 }
1600
1601 /* locate MDT which is less full (avoid the most full MDT) */
1602 static struct lu_tgt_desc *lmv_locate_tgt_lf(struct lmv_obd *lmv)
1603 {
1604         struct lu_tgt_desc *min = NULL;
1605         struct lu_tgt_desc *tgt;
1606         __u64 avail = 0;
1607         __u64 rand;
1608
1609         ENTRY;
1610
1611         if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs))
1612                 RETURN(ERR_PTR(-EAGAIN));
1613
1614         down_write(&lmv->lmv_qos.lq_rw_sem);
1615
1616         if (!ltd_qos_is_usable(&lmv->lmv_mdt_descs))
1617                 GOTO(unlock, tgt = ERR_PTR(-EAGAIN));
1618
1619         lmv_foreach_tgt(lmv, tgt) {
1620                 if (!tgt->ltd_exp || !tgt->ltd_active) {
1621                         tgt->ltd_qos.ltq_usable = 0;
1622                         continue;
1623                 }
1624
1625                 tgt->ltd_qos.ltq_usable = 1;
1626                 lu_tgt_qos_weight_calc(tgt);
1627                 avail += tgt->ltd_qos.ltq_avail;
1628                 if (!min || min->ltd_qos.ltq_avail > tgt->ltd_qos.ltq_avail)
1629                         min = tgt;
1630         }
1631
1632         /* avoid the most full MDT */
1633         if (min)
1634                 avail -= min->ltd_qos.ltq_avail;
1635
1636         rand = lu_prandom_u64_max(avail);
1637         avail = 0;
1638         lmv_foreach_connected_tgt(lmv, tgt) {
1639                 if (!tgt->ltd_qos.ltq_usable)
1640                         continue;
1641
1642                 if (tgt == min)
1643                         continue;
1644
1645                 avail += tgt->ltd_qos.ltq_avail;
1646                 if (avail < rand)
1647                         continue;
1648
1649                 GOTO(unlock, tgt);
1650         }
1651
1652         /* no proper target found */
1653         GOTO(unlock, tgt = ERR_PTR(-EAGAIN));
1654 unlock:
1655         up_write(&lmv->lmv_qos.lq_rw_sem);
1656
1657         RETURN(tgt);
1658 }
1659
1660 /* locate MDT by file name, for striped directory, the file name hash decides
1661  * which stripe its dirent is stored.
1662  */
1663 static struct lmv_tgt_desc *
1664 lmv_locate_tgt_by_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm,
1665                        const char *name, int namelen, struct lu_fid *fid,
1666                        __u32 *mds, bool new_layout)
1667 {
1668         struct lmv_tgt_desc *tgt;
1669         const struct lmv_oinfo *oinfo;
1670
1671         if (!lmv_dir_striped(lsm) || !namelen) {
1672                 tgt = lmv_fid2tgt(lmv, fid);
1673                 if (IS_ERR(tgt))
1674                         return tgt;
1675
1676                 *mds = tgt->ltd_index;
1677                 return tgt;
1678         }
1679
1680         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NAME_HASH)) {
1681                 if (cfs_fail_val >= lsm->lsm_md_stripe_count)
1682                         return ERR_PTR(-EBADF);
1683                 oinfo = &lsm->lsm_md_oinfo[cfs_fail_val];
1684         } else {
1685                 oinfo = lsm_name_to_stripe_info(lsm, name, namelen, new_layout);
1686                 if (IS_ERR(oinfo))
1687                         return ERR_CAST(oinfo);
1688         }
1689
1690         /* check stripe FID is sane */
1691         if (!fid_is_sane(&oinfo->lmo_fid))
1692                 return ERR_PTR(-ENODEV);
1693
1694         *fid = oinfo->lmo_fid;
1695         *mds = oinfo->lmo_mds;
1696         tgt = lmv_tgt(lmv, oinfo->lmo_mds);
1697
1698         CDEBUG(D_INODE, "locate MDT %u parent "DFID"\n", *mds, PFID(fid));
1699
1700         return tgt ? tgt : ERR_PTR(-ENODEV);
1701 }
1702
1703 /**
1704  * Locate MDT of op_data->op_fid1
1705  *
1706  * For striped directory, it will locate the stripe by name hash, if hash_type
1707  * is unknown, it will return the stripe specified by 'op_data->op_stripe_index'
1708  * which is set outside, and if dir is migrating, 'op_data->op_new_layout'
1709  * indicates whether old or new layout is used to locate.
1710  *
1711  * For plain direcotry, it just locate the MDT of op_data->op_fid1.
1712  *
1713  * \param[in] lmv               LMV device
1714  * \param[in/out] op_data       client MD stack parameters, name, namelen etc,
1715  *                              op_mds and op_fid1 will be updated if op_mea1
1716  *                              indicates fid1 represents a striped directory.
1717  *
1718  * retval               pointer to the lmv_tgt_desc if succeed.
1719  *                      ERR_PTR(errno) if failed.
1720  */
1721 struct lmv_tgt_desc *
1722 lmv_locate_tgt(struct lmv_obd *lmv, struct md_op_data *op_data)
1723 {
1724         struct lmv_stripe_md *lsm = op_data->op_mea1;
1725         struct lmv_oinfo *oinfo;
1726         struct lmv_tgt_desc *tgt;
1727
1728         if (lmv_dir_foreign(lsm))
1729                 return ERR_PTR(-ENODATA);
1730
1731         /* During creating VOLATILE file, it should honor the mdt
1732          * index if the file under striped dir is being restored, see
1733          * ct_restore(). */
1734         if (op_data->op_bias & MDS_CREATE_VOLATILE &&
1735             op_data->op_mds != LMV_OFFSET_DEFAULT) {
1736                 tgt = lmv_tgt(lmv, op_data->op_mds);
1737                 if (!tgt)
1738                         return ERR_PTR(-ENODEV);
1739
1740                 if (lmv_dir_striped(lsm)) {
1741                         int i;
1742
1743                         /* refill the right parent fid */
1744                         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
1745                                 oinfo = &lsm->lsm_md_oinfo[i];
1746                                 if (oinfo->lmo_mds == op_data->op_mds) {
1747                                         op_data->op_fid1 = oinfo->lmo_fid;
1748                                         break;
1749                                 }
1750                         }
1751
1752                         if (i == lsm->lsm_md_stripe_count)
1753                                 op_data->op_fid1 = lsm->lsm_md_oinfo[0].lmo_fid;
1754                 }
1755         } else if (lmv_dir_bad_hash(lsm)) {
1756                 LASSERT(op_data->op_stripe_index < lsm->lsm_md_stripe_count);
1757                 oinfo = &lsm->lsm_md_oinfo[op_data->op_stripe_index];
1758
1759                 op_data->op_fid1 = oinfo->lmo_fid;
1760                 op_data->op_mds = oinfo->lmo_mds;
1761                 tgt = lmv_tgt(lmv, oinfo->lmo_mds);
1762                 if (!tgt)
1763                         return ERR_PTR(-ENODEV);
1764         } else {
1765                 tgt = lmv_locate_tgt_by_name(lmv, op_data->op_mea1,
1766                                 op_data->op_name, op_data->op_namelen,
1767                                 &op_data->op_fid1, &op_data->op_mds,
1768                                 op_data->op_new_layout);
1769         }
1770
1771         return tgt;
1772 }
1773
1774 /* Locate MDT of op_data->op_fid2 for link/rename */
1775 static struct lmv_tgt_desc *
1776 lmv_locate_tgt2(struct lmv_obd *lmv, struct md_op_data *op_data)
1777 {
1778         struct lmv_tgt_desc *tgt;
1779         int rc;
1780
1781         LASSERT(op_data->op_name);
1782         if (lmv_dir_layout_changing(op_data->op_mea2)) {
1783                 struct lu_fid fid1 = op_data->op_fid1;
1784                 struct lmv_stripe_md *lsm1 = op_data->op_mea1;
1785                 struct ptlrpc_request *request = NULL;
1786
1787                 /*
1788                  * avoid creating new file under old layout of migrating
1789                  * directory, check it here.
1790                  */
1791                 tgt = lmv_locate_tgt_by_name(lmv, op_data->op_mea2,
1792                                 op_data->op_name, op_data->op_namelen,
1793                                 &op_data->op_fid2, &op_data->op_mds, false);
1794                 if (IS_ERR(tgt))
1795                         RETURN(tgt);
1796
1797                 op_data->op_fid1 = op_data->op_fid2;
1798                 op_data->op_mea1 = op_data->op_mea2;
1799                 rc = md_getattr_name(tgt->ltd_exp, op_data, &request);
1800                 op_data->op_fid1 = fid1;
1801                 op_data->op_mea1 = lsm1;
1802                 if (!rc) {
1803                         ptlrpc_req_finished(request);
1804                         RETURN(ERR_PTR(-EEXIST));
1805                 }
1806
1807                 if (rc != -ENOENT)
1808                         RETURN(ERR_PTR(rc));
1809         }
1810
1811         return lmv_locate_tgt_by_name(lmv, op_data->op_mea2, op_data->op_name,
1812                                 op_data->op_namelen, &op_data->op_fid2,
1813                                 &op_data->op_mds, true);
1814 }
1815
1816 int lmv_old_layout_lookup(struct lmv_obd *lmv, struct md_op_data *op_data)
1817 {
1818         struct lu_tgt_desc *tgt;
1819         struct ptlrpc_request *request;
1820         int rc;
1821
1822         LASSERT(lmv_dir_layout_changing(op_data->op_mea1));
1823         LASSERT(!op_data->op_new_layout);
1824
1825         tgt = lmv_locate_tgt(lmv, op_data);
1826         if (IS_ERR(tgt))
1827                 return PTR_ERR(tgt);
1828
1829         rc = md_getattr_name(tgt->ltd_exp, op_data, &request);
1830         if (!rc) {
1831                 ptlrpc_req_finished(request);
1832                 return -EEXIST;
1833         }
1834
1835         return rc;
1836 }
1837
1838 /* mkdir by QoS upon 'lfs mkdir -i -1'.
1839  *
1840  * NB, mkdir by QoS only if parent is not striped, this is to avoid remote
1841  * directories under striped directory.
1842  */
1843 static inline bool lmv_op_user_qos_mkdir(const struct md_op_data *op_data)
1844 {
1845         const struct lmv_user_md *lum = op_data->op_data;
1846
1847         if (op_data->op_code != LUSTRE_OPC_MKDIR)
1848                 return false;
1849
1850         if (lmv_dir_striped(op_data->op_mea1))
1851                 return false;
1852
1853         return (op_data->op_cli_flags & CLI_SET_MEA) && lum &&
1854                le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC &&
1855                le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT;
1856 }
1857
1858 /* mkdir by QoS if either ROOT or parent default LMV is space balanced. */
1859 static inline bool lmv_op_default_qos_mkdir(const struct md_op_data *op_data)
1860 {
1861         const struct lmv_stripe_md *lsm = op_data->op_default_mea1;
1862
1863         if (op_data->op_code != LUSTRE_OPC_MKDIR)
1864                 return false;
1865
1866         if (lmv_dir_striped(op_data->op_mea1))
1867                 return false;
1868
1869         return (op_data->op_flags & MF_QOS_MKDIR) ||
1870                (lsm && lsm->lsm_md_master_mdt_index == LMV_OFFSET_DEFAULT);
1871 }
1872
1873 /* if parent default LMV is space balanced, and
1874  * 1. max_inherit_rr is set
1875  * 2. or parent is ROOT
1876  * mkdir roundrobin. Or if parent doesn't have default LMV, while ROOT default
1877  * LMV requests roundrobin mkdir, do the same.
1878  * NB, this needs to check server is balanced, which is done by caller.
1879  */
1880 static inline bool lmv_op_default_rr_mkdir(const struct md_op_data *op_data)
1881 {
1882         const struct lmv_stripe_md *lsm = op_data->op_default_mea1;
1883
1884         return (op_data->op_flags & MF_RR_MKDIR) ||
1885                (lsm && lsm->lsm_md_max_inherit_rr != LMV_INHERIT_RR_NONE) ||
1886                fid_is_root(&op_data->op_fid1);
1887 }
1888
1889 /* 'lfs mkdir -i <specific_MDT>' */
1890 static inline bool lmv_op_user_specific_mkdir(const struct md_op_data *op_data)
1891 {
1892         const struct lmv_user_md *lum = op_data->op_data;
1893
1894         return op_data->op_code == LUSTRE_OPC_MKDIR &&
1895                op_data->op_cli_flags & CLI_SET_MEA && lum &&
1896                (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC ||
1897                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) &&
1898                le32_to_cpu(lum->lum_stripe_offset) != LMV_OFFSET_DEFAULT;
1899 }
1900
1901 /* parent default LMV master_mdt_index is not -1. */
1902 static inline bool
1903 lmv_op_default_specific_mkdir(const struct md_op_data *op_data)
1904 {
1905         return op_data->op_code == LUSTRE_OPC_MKDIR &&
1906                op_data->op_default_mea1 &&
1907                op_data->op_default_mea1->lsm_md_master_mdt_index !=
1908                         LMV_OFFSET_DEFAULT;
1909 }
1910
1911 /* locate MDT by space usage */
1912 static struct lu_tgt_desc *lmv_locate_tgt_by_space(struct lmv_obd *lmv,
1913                                                    struct md_op_data *op_data,
1914                                                    struct lmv_tgt_desc *tgt)
1915 {
1916         struct lmv_tgt_desc *tmp = tgt;
1917
1918         tgt = lmv_locate_tgt_qos(lmv, op_data);
1919         if (tgt == ERR_PTR(-EAGAIN)) {
1920                 if (ltd_qos_is_balanced(&lmv->lmv_mdt_descs) &&
1921                     !lmv_op_default_rr_mkdir(op_data) &&
1922                     !lmv_op_user_qos_mkdir(op_data))
1923                         /* if not necessary, don't create remote directory. */
1924                         tgt = tmp;
1925                 else
1926                         tgt = lmv_locate_tgt_rr(lmv);
1927         }
1928
1929         /*
1930          * only update statfs after QoS mkdir, this means the cached statfs may
1931          * be stale, and current mkdir may not follow QoS accurately, but it's
1932          * not serious, and avoids periodic statfs when client doesn't mkdir by
1933          * QoS.
1934          */
1935         if (!IS_ERR(tgt)) {
1936                 op_data->op_mds = tgt->ltd_index;
1937                 lmv_statfs_check_update(lmv2obd_dev(lmv), tgt);
1938         }
1939
1940         return tgt;
1941 }
1942
1943 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1944                 const void *data, size_t datalen, umode_t mode, uid_t uid,
1945                 gid_t gid, kernel_cap_t cap_effective, __u64 rdev,
1946                 struct ptlrpc_request **request)
1947 {
1948         struct obd_device *obd = exp->exp_obd;
1949         struct lmv_obd *lmv = &obd->u.lmv;
1950         struct lmv_tgt_desc *tgt;
1951         struct mdt_body *repbody;
1952         int rc;
1953
1954         ENTRY;
1955
1956         if (!lmv->lmv_mdt_descs.ltd_lmv_desc.ld_active_tgt_count)
1957                 RETURN(-EIO);
1958
1959         if (lmv_dir_bad_hash(op_data->op_mea1))
1960                 RETURN(-EBADF);
1961
1962         if (lmv_dir_layout_changing(op_data->op_mea1)) {
1963                 /*
1964                  * if parent is migrating, create() needs to lookup existing
1965                  * name in both old and new layout, check old layout on client.
1966                  */
1967                 rc = lmv_old_layout_lookup(lmv, op_data);
1968                 if (rc != -ENOENT)
1969                         RETURN(rc);
1970
1971                 op_data->op_new_layout = true;
1972         }
1973
1974         tgt = lmv_locate_tgt(lmv, op_data);
1975         if (IS_ERR(tgt))
1976                 RETURN(PTR_ERR(tgt));
1977
1978         /* the order to apply policy in mkdir:
1979          * 1. is "lfs mkdir -i N"? mkdir on MDT N.
1980          * 2. is "lfs mkdir -i -1"? mkdir by space usage.
1981          * 3. is starting MDT specified in default LMV? mkdir on MDT N.
1982          * 4. is default LMV space balanced? mkdir by space usage.
1983          */
1984         if (lmv_op_user_specific_mkdir(op_data)) {
1985                 struct lmv_user_md *lum = op_data->op_data;
1986
1987                 op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset);
1988                 tgt = lmv_tgt(lmv, op_data->op_mds);
1989                 if (!tgt)
1990                         RETURN(-ENODEV);
1991         } else if (lmv_op_user_qos_mkdir(op_data)) {
1992                 tgt = lmv_locate_tgt_by_space(lmv, op_data, tgt);
1993                 if (IS_ERR(tgt))
1994                         RETURN(PTR_ERR(tgt));
1995         } else if (lmv_op_default_specific_mkdir(op_data)) {
1996                 op_data->op_mds =
1997                         op_data->op_default_mea1->lsm_md_master_mdt_index;
1998                 tgt = lmv_tgt(lmv, op_data->op_mds);
1999                 if (!tgt)
2000                         RETURN(-ENODEV);
2001         } else if (lmv_op_default_qos_mkdir(op_data)) {
2002                 tgt = lmv_locate_tgt_by_space(lmv, op_data, tgt);
2003                 if (IS_ERR(tgt))
2004                         RETURN(PTR_ERR(tgt));
2005         }
2006
2007 retry:
2008         rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
2009         if (rc)
2010                 RETURN(rc);
2011
2012         CDEBUG(D_INODE, "CREATE name '%.*s' "DFID" on "DFID" -> mds #%x\n",
2013                 (int)op_data->op_namelen, op_data->op_name,
2014                 PFID(&op_data->op_fid2), PFID(&op_data->op_fid1),
2015                 op_data->op_mds);
2016
2017         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2018         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
2019                        cap_effective, rdev, request);
2020         if (rc == 0) {
2021                 if (*request == NULL)
2022                         RETURN(rc);
2023                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
2024         }
2025
2026         /* dir restripe needs to send to MDT where dir is located */
2027         if (rc != -EREMOTE ||
2028             !(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH))
2029                 RETURN(rc);
2030
2031         repbody = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2032         if (repbody == NULL)
2033                 RETURN(-EPROTO);
2034
2035         /* Not cross-ref case, just get out of here. */
2036         if (likely(!(repbody->mbo_valid & OBD_MD_MDS)))
2037                 RETURN(rc);
2038
2039         op_data->op_fid2 = repbody->mbo_fid1;
2040         ptlrpc_req_finished(*request);
2041         *request = NULL;
2042
2043         tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
2044         if (IS_ERR(tgt))
2045                 RETURN(PTR_ERR(tgt));
2046
2047         op_data->op_mds = tgt->ltd_index;
2048         goto retry;
2049 }
2050
2051 static int
2052 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
2053             const union ldlm_policy_data *policy, struct md_op_data *op_data,
2054             struct lustre_handle *lockh, __u64 extra_lock_flags)
2055 {
2056         struct obd_device *obd = exp->exp_obd;
2057         struct lmv_obd *lmv = &obd->u.lmv;
2058         struct lmv_tgt_desc *tgt;
2059         int rc;
2060
2061         ENTRY;
2062
2063         CDEBUG(D_INODE, "ENQUEUE on "DFID"\n", PFID(&op_data->op_fid1));
2064
2065         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
2066         if (IS_ERR(tgt))
2067                 RETURN(PTR_ERR(tgt));
2068
2069         CDEBUG(D_INODE, "ENQUEUE on "DFID" -> mds #%u\n",
2070                PFID(&op_data->op_fid1), tgt->ltd_index);
2071
2072         rc = md_enqueue(tgt->ltd_exp, einfo, policy, op_data, lockh,
2073                         extra_lock_flags);
2074
2075         RETURN(rc);
2076 }
2077
2078 int
2079 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
2080                  struct ptlrpc_request **preq)
2081 {
2082         struct obd_device *obd = exp->exp_obd;
2083         struct lmv_obd *lmv = &obd->u.lmv;
2084         struct lmv_tgt_desc *tgt;
2085         struct mdt_body *body;
2086         int rc;
2087
2088         ENTRY;
2089
2090 retry:
2091         if (op_data->op_namelen == 2 &&
2092             op_data->op_name[0] == '.' && op_data->op_name[1] == '.')
2093                 tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
2094         else
2095                 tgt = lmv_locate_tgt(lmv, op_data);
2096         if (IS_ERR(tgt))
2097                 RETURN(PTR_ERR(tgt));
2098
2099         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
2100                 (int)op_data->op_namelen, op_data->op_name,
2101                 PFID(&op_data->op_fid1), tgt->ltd_index);
2102
2103         rc = md_getattr_name(tgt->ltd_exp, op_data, preq);
2104         if (rc == -ENOENT && lmv_dir_retry_check_update(op_data)) {
2105                 ptlrpc_req_finished(*preq);
2106                 *preq = NULL;
2107                 goto retry;
2108         }
2109
2110         if (rc)
2111                 RETURN(rc);
2112
2113         body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY);
2114         LASSERT(body != NULL);
2115
2116         if (body->mbo_valid & OBD_MD_MDS) {
2117                 op_data->op_fid1 = body->mbo_fid1;
2118                 op_data->op_valid |= OBD_MD_FLCROSSREF;
2119                 op_data->op_namelen = 0;
2120                 op_data->op_name = NULL;
2121
2122                 ptlrpc_req_finished(*preq);
2123                 *preq = NULL;
2124
2125                 goto retry;
2126         }
2127
2128         RETURN(rc);
2129 }
2130
2131 #define md_op_data_fid(op_data, fl)                     \
2132         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
2133          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
2134          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
2135          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
2136          NULL)
2137
2138 static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt,
2139                             struct md_op_data *op_data, __u32 op_tgt,
2140                             enum ldlm_mode mode, int bits, int flag)
2141 {
2142         struct lu_fid *fid = md_op_data_fid(op_data, flag);
2143         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2144         union ldlm_policy_data policy = { { 0 } };
2145         int rc = 0;
2146         ENTRY;
2147
2148         if (!fid_is_sane(fid))
2149                 RETURN(0);
2150
2151         if (tgt == NULL) {
2152                 tgt = lmv_fid2tgt(lmv, fid);
2153                 if (IS_ERR(tgt))
2154                         RETURN(PTR_ERR(tgt));
2155         }
2156
2157         if (tgt->ltd_index != op_tgt) {
2158                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
2159                 policy.l_inodebits.bits = bits;
2160                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
2161                                       mode, LCF_ASYNC, NULL);
2162         } else {
2163                 CDEBUG(D_INODE,
2164                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
2165                        op_tgt, PFID(fid));
2166                 op_data->op_flags |= flag;
2167                 rc = 0;
2168         }
2169
2170         RETURN(rc);
2171 }
2172
2173 /*
2174  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
2175  * op_data->op_fid2
2176  */
2177 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
2178                     struct ptlrpc_request **request)
2179 {
2180         struct obd_device       *obd = exp->exp_obd;
2181         struct lmv_obd          *lmv = &obd->u.lmv;
2182         struct lmv_tgt_desc     *tgt;
2183         int                      rc;
2184         ENTRY;
2185
2186         LASSERT(op_data->op_namelen != 0);
2187
2188         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
2189                PFID(&op_data->op_fid2), (int)op_data->op_namelen,
2190                op_data->op_name, PFID(&op_data->op_fid1));
2191
2192         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2193         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2194         op_data->op_cap = current_cap();
2195
2196         tgt = lmv_locate_tgt2(lmv, op_data);
2197         if (IS_ERR(tgt))
2198                 RETURN(PTR_ERR(tgt));
2199
2200         /*
2201          * Cancel UPDATE lock on child (fid1).
2202          */
2203         op_data->op_flags |= MF_MDC_CANCEL_FID2;
2204         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_index, LCK_EX,
2205                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
2206         if (rc != 0)
2207                 RETURN(rc);
2208
2209         rc = md_link(tgt->ltd_exp, op_data, request);
2210
2211         RETURN(rc);
2212 }
2213
2214 /* migrate the top directory */
2215 static inline bool lmv_op_topdir_migrate(const struct md_op_data *op_data)
2216 {
2217         if (!S_ISDIR(op_data->op_mode))
2218                 return false;
2219
2220         if (lmv_dir_layout_changing(op_data->op_mea1))
2221                 return false;
2222
2223         return true;
2224 }
2225
2226 /* migrate top dir to specific MDTs */
2227 static inline bool lmv_topdir_specific_migrate(const struct md_op_data *op_data)
2228 {
2229         const struct lmv_user_md *lum = op_data->op_data;
2230
2231         if (!lmv_op_topdir_migrate(op_data))
2232                 return false;
2233
2234         return le32_to_cpu(lum->lum_stripe_offset) != LMV_OFFSET_DEFAULT;
2235 }
2236
2237 /* migrate top dir in QoS mode if user issued "lfs migrate -m -1..." */
2238 static inline bool lmv_topdir_qos_migrate(const struct md_op_data *op_data)
2239 {
2240         const struct lmv_user_md *lum = op_data->op_data;
2241
2242         if (!lmv_op_topdir_migrate(op_data))
2243                 return false;
2244
2245         return le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT;
2246 }
2247
2248 static inline bool lmv_subdir_specific_migrate(const struct md_op_data *op_data)
2249 {
2250         const struct lmv_user_md *lum = op_data->op_data;
2251
2252         if (!S_ISDIR(op_data->op_mode))
2253                 return false;
2254
2255         if (!lmv_dir_layout_changing(op_data->op_mea1))
2256                 return false;
2257
2258         return le32_to_cpu(lum->lum_stripe_offset) != LMV_OFFSET_DEFAULT;
2259 }
2260
2261 static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data,
2262                         const char *name, size_t namelen,
2263                         struct ptlrpc_request **request)
2264 {
2265         struct obd_device *obd = exp->exp_obd;
2266         struct lmv_obd *lmv = &obd->u.lmv;
2267         struct lmv_stripe_md *lsm = op_data->op_mea1;
2268         struct lmv_tgt_desc *parent_tgt;
2269         struct lmv_tgt_desc *sp_tgt;
2270         struct lmv_tgt_desc *tp_tgt = NULL;
2271         struct lmv_tgt_desc *child_tgt;
2272         struct lmv_tgt_desc *tgt;
2273         struct lu_fid target_fid = { 0 };
2274         int rc;
2275
2276         ENTRY;
2277
2278         LASSERT(op_data->op_cli_flags & CLI_MIGRATE);
2279
2280         CDEBUG(D_INODE, "MIGRATE "DFID"/%.*s\n",
2281                PFID(&op_data->op_fid1), (int)namelen, name);
2282
2283         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2284         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2285         op_data->op_cap = current_cap();
2286
2287         parent_tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
2288         if (IS_ERR(parent_tgt))
2289                 RETURN(PTR_ERR(parent_tgt));
2290
2291         if (lmv_dir_striped(lsm)) {
2292                 const struct lmv_oinfo *oinfo;
2293
2294                 oinfo = lsm_name_to_stripe_info(lsm, name, namelen, false);
2295                 if (IS_ERR(oinfo))
2296                         RETURN(PTR_ERR(oinfo));
2297
2298                 /* save source stripe FID in fid4 temporarily for ELC */
2299                 op_data->op_fid4 = oinfo->lmo_fid;
2300                 sp_tgt = lmv_tgt(lmv, oinfo->lmo_mds);
2301                 if (!sp_tgt)
2302                         RETURN(-ENODEV);
2303
2304                 /*
2305                  * if parent is being migrated too, fill op_fid2 with target
2306                  * stripe fid, otherwise the target stripe is not created yet.
2307                  */
2308                 if (lmv_dir_layout_changing(lsm)) {
2309                         oinfo = lsm_name_to_stripe_info(lsm, name, namelen,
2310                                                         true);
2311                         if (IS_ERR(oinfo))
2312                                 RETURN(PTR_ERR(oinfo));
2313
2314                         op_data->op_fid2 = oinfo->lmo_fid;
2315                         tp_tgt = lmv_tgt(lmv, oinfo->lmo_mds);
2316                         if (!tp_tgt)
2317                                 RETURN(-ENODEV);
2318
2319                         /* parent unchanged and update namespace only */
2320                         if (lu_fid_eq(&op_data->op_fid4, &op_data->op_fid2) &&
2321                             op_data->op_bias & MDS_MIGRATE_NSONLY)
2322                                 RETURN(-EALREADY);
2323                 }
2324         } else {
2325                 sp_tgt = parent_tgt;
2326         }
2327
2328         child_tgt = lmv_fid2tgt(lmv, &op_data->op_fid3);
2329         if (IS_ERR(child_tgt))
2330                 RETURN(PTR_ERR(child_tgt));
2331
2332         if (lmv_topdir_specific_migrate(op_data)) {
2333                 struct lmv_user_md *lum = op_data->op_data;
2334
2335                 op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset);
2336         } else if (lmv_topdir_qos_migrate(op_data)) {
2337                 tgt = lmv_locate_tgt_lf(lmv);
2338                 if (tgt == ERR_PTR(-EAGAIN))
2339                         tgt = lmv_locate_tgt_rr(lmv);
2340                 if (IS_ERR(tgt))
2341                         RETURN(PTR_ERR(tgt));
2342
2343                 op_data->op_mds = tgt->ltd_index;
2344         } else if (lmv_subdir_specific_migrate(op_data)) {
2345                 struct lmv_user_md *lum = op_data->op_data;
2346                 __u32 i;
2347
2348                 LASSERT(tp_tgt);
2349                 if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) {
2350                         /* adjust MDTs in lum, since subdir is located on where
2351                          * its parent stripe is, not the first specified MDT.
2352                          */
2353                         for (i = 0; i < le32_to_cpu(lum->lum_stripe_count);
2354                              i++) {
2355                                 if (le32_to_cpu(lum->lum_objects[i].lum_mds) ==
2356                                     tp_tgt->ltd_index)
2357                                         break;
2358                         }
2359
2360                         if (i == le32_to_cpu(lum->lum_stripe_count))
2361                                 RETURN(-ENODEV);
2362
2363                         lum->lum_objects[i].lum_mds =
2364                                 lum->lum_objects[0].lum_mds;
2365                         lum->lum_objects[0].lum_mds =
2366                                 cpu_to_le32(tp_tgt->ltd_index);
2367                 }
2368                 /* NB, the above adjusts subdir migration for command like
2369                  * "lfs migrate -m 0,1,2 ...", but for migration like
2370                  * "lfs migrate -m 0 -c 2 ...", the top dir is migrated to MDT0
2371                  * and MDT1, however its subdir may be migrated to MDT1 and MDT2
2372                  */
2373
2374                 lum->lum_stripe_offset = cpu_to_le32(tp_tgt->ltd_index);
2375                 op_data->op_mds = tp_tgt->ltd_index;
2376         } else if (tp_tgt) {
2377                 op_data->op_mds = tp_tgt->ltd_index;
2378         } else {
2379                 op_data->op_mds = sp_tgt->ltd_index;
2380         }
2381
2382         rc = lmv_fid_alloc(NULL, exp, &target_fid, op_data);
2383         if (rc)
2384                 RETURN(rc);
2385
2386         /*
2387          * for directory, send migrate request to the MDT where the object will
2388          * be migrated to, because we can't create a striped directory remotely.
2389          *
2390          * otherwise, send to the MDT where source is located because regular
2391          * file may open lease.
2392          *
2393          * NB. if MDT doesn't support DIR_MIGRATE, send to source MDT too for
2394          * backward compatibility.
2395          */
2396         if (S_ISDIR(op_data->op_mode) &&
2397             (exp_connect_flags2(exp) & OBD_CONNECT2_DIR_MIGRATE)) {
2398                 tgt = lmv_fid2tgt(lmv, &target_fid);
2399                 if (IS_ERR(tgt))
2400                         RETURN(PTR_ERR(tgt));
2401         } else {
2402                 tgt = child_tgt;
2403         }
2404
2405         /* cancel UPDATE lock of parent master object */
2406         rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_index, LCK_EX,
2407                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
2408         if (rc)
2409                 RETURN(rc);
2410
2411         /* cancel UPDATE lock of source parent */
2412         if (sp_tgt != parent_tgt) {
2413                 /*
2414                  * migrate RPC packs master object FID, because we can only pack
2415                  * two FIDs in reint RPC, but MDS needs to know both source
2416                  * parent and target parent, and it will obtain them from master
2417                  * FID and LMV, the other FID in RPC is kept for target.
2418                  *
2419                  * since this FID is not passed to MDC, cancel it anyway.
2420                  */
2421                 rc = lmv_early_cancel(exp, sp_tgt, op_data, -1, LCK_EX,
2422                                       MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID4);
2423                 if (rc)
2424                         RETURN(rc);
2425
2426                 op_data->op_flags &= ~MF_MDC_CANCEL_FID4;
2427         }
2428         op_data->op_fid4 = target_fid;
2429
2430         /* cancel UPDATE locks of target parent */
2431         rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_index, LCK_EX,
2432                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
2433         if (rc)
2434                 RETURN(rc);
2435
2436         /* cancel LOOKUP lock of source if source is remote object */
2437         if (child_tgt != sp_tgt) {
2438                 rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_index,
2439                                       LCK_EX, MDS_INODELOCK_LOOKUP,
2440                                       MF_MDC_CANCEL_FID3);
2441                 if (rc)
2442                         RETURN(rc);
2443         }
2444
2445         /* cancel ELC locks of source */
2446         rc = lmv_early_cancel(exp, child_tgt, op_data, tgt->ltd_index, LCK_EX,
2447                               MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3);
2448         if (rc)
2449                 RETURN(rc);
2450
2451         rc = md_rename(tgt->ltd_exp, op_data, name, namelen, NULL, 0, request);
2452
2453         RETURN(rc);
2454 }
2455
2456 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
2457                       const char *old, size_t oldlen,
2458                       const char *new, size_t newlen,
2459                       struct ptlrpc_request **request)
2460 {
2461         struct obd_device *obd = exp->exp_obd;
2462         struct lmv_obd *lmv = &obd->u.lmv;
2463         struct lmv_tgt_desc *sp_tgt;
2464         struct lmv_tgt_desc *tp_tgt = NULL;
2465         struct lmv_tgt_desc *src_tgt = NULL;
2466         struct lmv_tgt_desc *tgt;
2467         struct mdt_body *body;
2468         int rc;
2469
2470         ENTRY;
2471
2472         LASSERT(oldlen != 0);
2473
2474         if (op_data->op_cli_flags & CLI_MIGRATE) {
2475                 rc = lmv_migrate(exp, op_data, old, oldlen, request);
2476                 RETURN(rc);
2477         }
2478
2479         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2480         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2481         op_data->op_cap = current_cap();
2482
2483         op_data->op_name = new;
2484         op_data->op_namelen = newlen;
2485
2486         tp_tgt = lmv_locate_tgt2(lmv, op_data);
2487         if (IS_ERR(tp_tgt))
2488                 RETURN(PTR_ERR(tp_tgt));
2489
2490         /* Since the target child might be destroyed, and it might become
2491          * orphan, and we can only check orphan on the local MDT right now, so
2492          * we send rename request to the MDT where target child is located. If
2493          * target child does not exist, then it will send the request to the
2494          * target parent */
2495         if (fid_is_sane(&op_data->op_fid4)) {
2496                 tgt = lmv_fid2tgt(lmv, &op_data->op_fid4);
2497                 if (IS_ERR(tgt))
2498                         RETURN(PTR_ERR(tgt));
2499         } else {
2500                 tgt = tp_tgt;
2501         }
2502
2503         op_data->op_flags |= MF_MDC_CANCEL_FID4;
2504
2505         /* cancel UPDATE locks of target parent */
2506         rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_index, LCK_EX,
2507                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
2508         if (rc != 0)
2509                 RETURN(rc);
2510
2511         if (fid_is_sane(&op_data->op_fid4)) {
2512                 /* cancel LOOKUP lock of target on target parent */
2513                 if (tgt != tp_tgt) {
2514                         rc = lmv_early_cancel(exp, tp_tgt, op_data,
2515                                               tgt->ltd_index, LCK_EX,
2516                                               MDS_INODELOCK_LOOKUP,
2517                                               MF_MDC_CANCEL_FID4);
2518                         if (rc != 0)
2519                                 RETURN(rc);
2520                 }
2521         }
2522
2523         if (fid_is_sane(&op_data->op_fid3)) {
2524                 src_tgt = lmv_fid2tgt(lmv, &op_data->op_fid3);
2525                 if (IS_ERR(src_tgt))
2526                         RETURN(PTR_ERR(src_tgt));
2527
2528                 /* cancel ELC locks of source */
2529                 rc = lmv_early_cancel(exp, src_tgt, op_data, tgt->ltd_index,
2530                                       LCK_EX, MDS_INODELOCK_ELC,
2531                                       MF_MDC_CANCEL_FID3);
2532                 if (rc != 0)
2533                         RETURN(rc);
2534         }
2535
2536         op_data->op_name = old;
2537         op_data->op_namelen = oldlen;
2538 retry:
2539         sp_tgt = lmv_locate_tgt(lmv, op_data);
2540         if (IS_ERR(sp_tgt))
2541                 RETURN(PTR_ERR(sp_tgt));
2542
2543         /* cancel UPDATE locks of source parent */
2544         rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_index, LCK_EX,
2545                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
2546         if (rc != 0)
2547                 RETURN(rc);
2548
2549         if (fid_is_sane(&op_data->op_fid3)) {
2550                 /* cancel LOOKUP lock of source on source parent */
2551                 if (src_tgt != sp_tgt) {
2552                         rc = lmv_early_cancel(exp, sp_tgt, op_data,
2553                                               tgt->ltd_index, LCK_EX,
2554                                               MDS_INODELOCK_LOOKUP,
2555                                               MF_MDC_CANCEL_FID3);
2556                         if (rc != 0)
2557                                 RETURN(rc);
2558                 }
2559         }
2560
2561 rename:
2562         CDEBUG(D_INODE, "RENAME "DFID"/%.*s to "DFID"/%.*s\n",
2563                 PFID(&op_data->op_fid1), (int)oldlen, old,
2564                 PFID(&op_data->op_fid2), (int)newlen, new);
2565
2566         rc = md_rename(tgt->ltd_exp, op_data, old, oldlen, new, newlen,
2567                         request);
2568         if (rc == -ENOENT && lmv_dir_retry_check_update(op_data)) {
2569                 ptlrpc_req_finished(*request);
2570                 *request = NULL;
2571                 goto retry;
2572         }
2573
2574         if (rc && rc != -EXDEV)
2575                 RETURN(rc);
2576
2577         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2578         if (body == NULL)
2579                 RETURN(-EPROTO);
2580
2581         /* Not cross-ref case, just get out of here. */
2582         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
2583                 RETURN(rc);
2584
2585         op_data->op_fid4 = body->mbo_fid1;
2586
2587         ptlrpc_req_finished(*request);
2588         *request = NULL;
2589
2590         tgt = lmv_fid2tgt(lmv, &op_data->op_fid4);
2591         if (IS_ERR(tgt))
2592                 RETURN(PTR_ERR(tgt));
2593
2594         if (fid_is_sane(&op_data->op_fid4)) {
2595                 /* cancel LOOKUP lock of target on target parent */
2596                 if (tgt != tp_tgt) {
2597                         rc = lmv_early_cancel(exp, tp_tgt, op_data,
2598                                               tgt->ltd_index, LCK_EX,
2599                                               MDS_INODELOCK_LOOKUP,
2600                                               MF_MDC_CANCEL_FID4);
2601                         if (rc != 0)
2602                                 RETURN(rc);
2603                 }
2604         }
2605
2606         goto rename;
2607 }
2608
2609 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2610                        void *ea, size_t ealen, struct ptlrpc_request **request)
2611 {
2612         struct obd_device *obd = exp->exp_obd;
2613         struct lmv_obd *lmv = &obd->u.lmv;
2614         struct lmv_tgt_desc *tgt;
2615         int rc = 0;
2616
2617         ENTRY;
2618
2619         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x/0x%x\n",
2620                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
2621                op_data->op_xvalid);
2622
2623         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2624         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
2625         if (IS_ERR(tgt))
2626                 RETURN(PTR_ERR(tgt));
2627
2628         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, request);
2629
2630         RETURN(rc);
2631 }
2632
2633 static int lmv_fsync(struct obd_export *exp, const struct lu_fid *fid,
2634                      struct ptlrpc_request **request)
2635 {
2636         struct obd_device *obd = exp->exp_obd;
2637         struct lmv_obd *lmv = &obd->u.lmv;
2638         struct lmv_tgt_desc *tgt;
2639         int rc;
2640
2641         ENTRY;
2642
2643         tgt = lmv_fid2tgt(lmv, fid);
2644         if (IS_ERR(tgt))
2645                 RETURN(PTR_ERR(tgt));
2646
2647         rc = md_fsync(tgt->ltd_exp, fid, request);
2648         RETURN(rc);
2649 }
2650
2651 struct stripe_dirent {
2652         struct page             *sd_page;
2653         struct lu_dirpage       *sd_dp;
2654         struct lu_dirent        *sd_ent;
2655         bool                     sd_eof;
2656 };
2657
2658 struct lmv_dir_ctxt {
2659         struct lmv_obd          *ldc_lmv;
2660         struct md_op_data       *ldc_op_data;
2661         struct md_readdir_info  *ldc_mrinfo;
2662         __u64                    ldc_hash;
2663         int                      ldc_count;
2664         struct stripe_dirent     ldc_stripes[0];
2665 };
2666
2667 static inline void stripe_dirent_unload(struct stripe_dirent *stripe)
2668 {
2669         if (stripe->sd_page) {
2670                 kunmap(stripe->sd_page);
2671                 put_page(stripe->sd_page);
2672                 stripe->sd_page = NULL;
2673                 stripe->sd_ent = NULL;
2674         }
2675 }
2676
2677 static inline void put_lmv_dir_ctxt(struct lmv_dir_ctxt *ctxt)
2678 {
2679         int i;
2680
2681         for (i = 0; i < ctxt->ldc_count; i++)
2682                 stripe_dirent_unload(&ctxt->ldc_stripes[i]);
2683 }
2684
2685 /* if @ent is dummy, or . .., get next */
2686 static struct lu_dirent *stripe_dirent_get(struct lmv_dir_ctxt *ctxt,
2687                                            struct lu_dirent *ent,
2688                                            int stripe_index)
2689 {
2690         for (; ent; ent = lu_dirent_next(ent)) {
2691                 /* Skip dummy entry */
2692                 if (le16_to_cpu(ent->lde_namelen) == 0)
2693                         continue;
2694
2695                 /* skip . and .. for other stripes */
2696                 if (stripe_index &&
2697                     (strncmp(ent->lde_name, ".",
2698                              le16_to_cpu(ent->lde_namelen)) == 0 ||
2699                      strncmp(ent->lde_name, "..",
2700                              le16_to_cpu(ent->lde_namelen)) == 0))
2701                         continue;
2702
2703                 if (le64_to_cpu(ent->lde_hash) >= ctxt->ldc_hash)
2704                         break;
2705         }
2706
2707         return ent;
2708 }
2709
2710 static struct lu_dirent *stripe_dirent_load(struct lmv_dir_ctxt *ctxt,
2711                                             struct stripe_dirent *stripe,
2712                                             int stripe_index)
2713 {
2714         struct md_op_data *op_data = ctxt->ldc_op_data;
2715         struct lmv_oinfo *oinfo;
2716         struct lu_fid fid = op_data->op_fid1;
2717         struct inode *inode = op_data->op_data;
2718         struct lmv_tgt_desc *tgt;
2719         struct lu_dirent *ent = stripe->sd_ent;
2720         __u64 hash = ctxt->ldc_hash;
2721         int rc = 0;
2722
2723         ENTRY;
2724
2725         LASSERT(stripe == &ctxt->ldc_stripes[stripe_index]);
2726         LASSERT(!ent);
2727
2728         do {
2729                 if (stripe->sd_page) {
2730                         __u64 end = le64_to_cpu(stripe->sd_dp->ldp_hash_end);
2731
2732                         /* @hash should be the last dirent hash */
2733                         LASSERTF(hash <= end,
2734                                  "ctxt@%p stripe@%p hash %llx end %llx\n",
2735                                  ctxt, stripe, hash, end);
2736                         /* unload last page */
2737                         stripe_dirent_unload(stripe);
2738                         /* eof */
2739                         if (end == MDS_DIR_END_OFF) {
2740                                 stripe->sd_eof = true;
2741                                 break;
2742                         }
2743                         hash = end;
2744                 }
2745
2746                 oinfo = &op_data->op_mea1->lsm_md_oinfo[stripe_index];
2747                 if (!oinfo->lmo_root) {
2748                         rc = -ENOENT;
2749                         break;
2750                 }
2751
2752                 tgt = lmv_tgt(ctxt->ldc_lmv, oinfo->lmo_mds);
2753                 if (!tgt) {
2754                         rc = -ENODEV;
2755                         break;
2756                 }
2757
2758                 /* op_data is shared by stripes, reset after use */
2759                 op_data->op_fid1 = oinfo->lmo_fid;
2760                 op_data->op_fid2 = oinfo->lmo_fid;
2761                 op_data->op_data = oinfo->lmo_root;
2762
2763                 rc = md_read_page(tgt->ltd_exp, op_data, ctxt->ldc_mrinfo, hash,
2764                                   &stripe->sd_page);
2765
2766                 op_data->op_fid1 = fid;
2767                 op_data->op_fid2 = fid;
2768                 op_data->op_data = inode;
2769
2770                 if (rc)
2771                         break;
2772
2773                 stripe->sd_dp = page_address(stripe->sd_page);
2774                 ent = stripe_dirent_get(ctxt, lu_dirent_start(stripe->sd_dp),
2775                                         stripe_index);
2776                 /* in case a page filled with ., .. and dummy, read next */
2777         } while (!ent);
2778
2779         stripe->sd_ent = ent;
2780         if (rc) {
2781                 LASSERT(!ent);
2782                 /* treat error as eof, so dir can be partially accessed */
2783                 stripe->sd_eof = true;
2784                 ctxt->ldc_mrinfo->mr_partial_readdir_rc = rc;
2785                 LCONSOLE_WARN("dir "DFID" stripe %d readdir failed: %d, "
2786                               "directory is partially accessed!\n",
2787                               PFID(&ctxt->ldc_op_data->op_fid1), stripe_index,
2788                               rc);
2789         }
2790
2791         RETURN(ent);
2792 }
2793
2794 static int lmv_file_resync(struct obd_export *exp, struct md_op_data *data)
2795 {
2796         struct obd_device *obd = exp->exp_obd;
2797         struct lmv_obd *lmv = &obd->u.lmv;
2798         struct lmv_tgt_desc *tgt;
2799         int rc;
2800
2801         ENTRY;
2802
2803         rc = lmv_check_connect(obd);
2804         if (rc != 0)
2805                 RETURN(rc);
2806
2807         tgt = lmv_fid2tgt(lmv, &data->op_fid1);
2808         if (IS_ERR(tgt))
2809                 RETURN(PTR_ERR(tgt));
2810
2811         data->op_flags |= MF_MDC_CANCEL_FID1;
2812         rc = md_file_resync(tgt->ltd_exp, data);
2813         RETURN(rc);
2814 }
2815
2816 /**
2817  * Get dirent with the closest hash for striped directory
2818  *
2819  * This function will search the dir entry, whose hash value is the
2820  * closest(>=) to hash from all of sub-stripes, and it is only being called
2821  * for striped directory.
2822  *
2823  * \param[in] ctxt              dir read context
2824  *
2825  * \retval                      dirent get the entry successfully
2826  *                              NULL does not get the entry, normally it means
2827  *                              it reaches the end of the directory, while read
2828  *                              stripe dirent error is ignored to allow partial
2829  *                              access.
2830  */
2831 static struct lu_dirent *lmv_dirent_next(struct lmv_dir_ctxt *ctxt)
2832 {
2833         struct stripe_dirent *stripe;
2834         struct lu_dirent *ent = NULL;
2835         int i;
2836         int min = -1;
2837
2838         /* TODO: optimize with k-way merge sort */
2839         for (i = 0; i < ctxt->ldc_count; i++) {
2840                 stripe = &ctxt->ldc_stripes[i];
2841                 if (stripe->sd_eof)
2842                         continue;
2843
2844                 if (!stripe->sd_ent) {
2845                         stripe_dirent_load(ctxt, stripe, i);
2846                         if (!stripe->sd_ent) {
2847                                 LASSERT(stripe->sd_eof);
2848                                 continue;
2849                         }
2850                 }
2851
2852                 if (min == -1 ||
2853                     le64_to_cpu(ctxt->ldc_stripes[min].sd_ent->lde_hash) >
2854                     le64_to_cpu(stripe->sd_ent->lde_hash)) {
2855                         min = i;
2856                         if (le64_to_cpu(stripe->sd_ent->lde_hash) ==
2857                             ctxt->ldc_hash)
2858                                 break;
2859                 }
2860         }
2861
2862         if (min != -1) {
2863                 stripe = &ctxt->ldc_stripes[min];
2864                 ent = stripe->sd_ent;
2865                 /* pop found dirent */
2866                 stripe->sd_ent = stripe_dirent_get(ctxt, lu_dirent_next(ent),
2867                                                    min);
2868         }
2869
2870         return ent;
2871 }
2872
2873 /**
2874  * Build dir entry page for striped directory
2875  *
2876  * This function gets one entry by @offset from a striped directory. It will
2877  * read entries from all of stripes, and choose one closest to the required
2878  * offset(&offset). A few notes
2879  * 1. skip . and .. for non-zero stripes, because there can only have one .
2880  * and .. in a directory.
2881  * 2. op_data will be shared by all of stripes, instead of allocating new
2882  * one, so need to restore before reusing.
2883  *
2884  * \param[in] exp       obd export refer to LMV
2885  * \param[in] op_data   hold those MD parameters of read_entry
2886  * \param[in] mrinfo    ldlm callback being used in enqueue in mdc_read_entry,
2887  *                      and partial readdir result will be stored in it.
2888  * \param[in] offset    starting hash offset
2889  * \param[out] ppage    the page holding the entry. Note: because the entry
2890  *                      will be accessed in upper layer, so we need hold the
2891  *                      page until the usages of entry is finished, see
2892  *                      ll_dir_entry_next.
2893  *
2894  * retval               =0 if get entry successfully
2895  *                      <0 cannot get entry
2896  */
2897 static int lmv_striped_read_page(struct obd_export *exp,
2898                                  struct md_op_data *op_data,
2899                                  struct md_readdir_info *mrinfo, __u64 offset,
2900                                  struct page **ppage)
2901 {
2902         struct page *page = NULL;
2903         struct lu_dirpage *dp;
2904         void *start;
2905         struct lu_dirent *ent;
2906         struct lu_dirent *last_ent;
2907         int stripe_count;
2908         struct lmv_dir_ctxt *ctxt;
2909         struct lu_dirent *next = NULL;
2910         __u16 ent_size;
2911         size_t left_bytes;
2912         int rc = 0;
2913         ENTRY;
2914
2915         /* Allocate a page and read entries from all of stripes and fill
2916          * the page by hash order */
2917         page = alloc_page(GFP_KERNEL);
2918         if (!page)
2919                 RETURN(-ENOMEM);
2920
2921         /* Initialize the entry page */
2922         dp = kmap(page);
2923         memset(dp, 0, sizeof(*dp));
2924         dp->ldp_hash_start = cpu_to_le64(offset);
2925
2926         start = dp + 1;
2927         left_bytes = PAGE_SIZE - sizeof(*dp);
2928         ent = start;
2929         last_ent = ent;
2930
2931         /* initalize dir read context */
2932         stripe_count = op_data->op_mea1->lsm_md_stripe_count;
2933         OBD_ALLOC(ctxt, offsetof(typeof(*ctxt), ldc_stripes[stripe_count]));
2934         if (!ctxt)
2935                 GOTO(free_page, rc = -ENOMEM);
2936         ctxt->ldc_lmv = &exp->exp_obd->u.lmv;
2937         ctxt->ldc_op_data = op_data;
2938         ctxt->ldc_mrinfo = mrinfo;
2939         ctxt->ldc_hash = offset;
2940         ctxt->ldc_count = stripe_count;
2941
2942         while (1) {
2943                 next = lmv_dirent_next(ctxt);
2944
2945                 /* end of directory */
2946                 if (!next) {
2947                         ctxt->ldc_hash = MDS_DIR_END_OFF;
2948                         break;
2949                 }
2950                 ctxt->ldc_hash = le64_to_cpu(next->lde_hash);
2951
2952                 ent_size = le16_to_cpu(next->lde_reclen);
2953
2954                 /* the last entry lde_reclen is 0, but it might not be the last
2955                  * one of this temporay dir page */
2956                 if (!ent_size)
2957                         ent_size = lu_dirent_calc_size(
2958                                         le16_to_cpu(next->lde_namelen),
2959                                         le32_to_cpu(next->lde_attrs));
2960                 /* page full */
2961                 if (ent_size > left_bytes)
2962                         break;
2963
2964                 memcpy(ent, next, ent_size);
2965
2966                 /* Replace . with master FID and Replace .. with the parent FID
2967                  * of master object */
2968                 if (strncmp(ent->lde_name, ".",
2969                             le16_to_cpu(ent->lde_namelen)) == 0 &&
2970                     le16_to_cpu(ent->lde_namelen) == 1)
2971                         fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid1);
2972                 else if (strncmp(ent->lde_name, "..",
2973                                    le16_to_cpu(ent->lde_namelen)) == 0 &&
2974                            le16_to_cpu(ent->lde_namelen) == 2)
2975                         fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
2976
2977                 CDEBUG(D_INODE, "entry %.*s hash %#llx\n",
2978                        le16_to_cpu(ent->lde_namelen), ent->lde_name,
2979                        le64_to_cpu(ent->lde_hash));
2980
2981                 left_bytes -= ent_size;
2982                 ent->lde_reclen = cpu_to_le16(ent_size);
2983                 last_ent = ent;
2984                 ent = (void *)ent + ent_size;
2985         };
2986
2987         last_ent->lde_reclen = 0;
2988
2989         if (ent == start)
2990                 dp->ldp_flags |= LDF_EMPTY;
2991         else if (ctxt->ldc_hash == le64_to_cpu(last_ent->lde_hash))
2992                 dp->ldp_flags |= LDF_COLLIDE;
2993         dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2994         dp->ldp_hash_end = cpu_to_le64(ctxt->ldc_hash);
2995
2996         put_lmv_dir_ctxt(ctxt);
2997         OBD_FREE(ctxt, offsetof(typeof(*ctxt), ldc_stripes[stripe_count]));
2998
2999         *ppage = page;
3000
3001         RETURN(0);
3002
3003 free_page:
3004         kunmap(page);
3005         __free_page(page);
3006
3007         return rc;
3008 }
3009
3010 static int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
3011                          struct md_readdir_info *mrinfo, __u64 offset,
3012                          struct page **ppage)
3013 {
3014         struct obd_device *obd = exp->exp_obd;
3015         struct lmv_obd *lmv = &obd->u.lmv;
3016         struct lmv_tgt_desc *tgt;
3017         int rc;
3018
3019         ENTRY;
3020
3021         if (unlikely(lmv_dir_foreign(op_data->op_mea1)))
3022                 RETURN(-ENODATA);
3023
3024         if (unlikely(lmv_dir_striped(op_data->op_mea1))) {
3025                 rc = lmv_striped_read_page(exp, op_data, mrinfo, offset, ppage);
3026                 RETURN(rc);
3027         }
3028
3029         tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
3030         if (IS_ERR(tgt))
3031                 RETURN(PTR_ERR(tgt));
3032
3033         rc = md_read_page(tgt->ltd_exp, op_data, mrinfo, offset, ppage);
3034
3035         RETURN(rc);
3036 }
3037
3038 /**
3039  * Unlink a file/directory
3040  *
3041  * Unlink a file or directory under the parent dir. The unlink request
3042  * usually will be sent to the MDT where the child is located, but if
3043  * the client does not have the child FID then request will be sent to the
3044  * MDT where the parent is located.
3045  *
3046  * If the parent is a striped directory then it also needs to locate which
3047  * stripe the name of the child is located, and replace the parent FID
3048  * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown,
3049  * it will walk through all of sub-stripes until the child is being
3050  * unlinked finally.
3051  *
3052  * \param[in] exp       export refer to LMV
3053  * \param[in] op_data   different parameters transferred beween client
3054  *                      MD stacks, name, namelen, FIDs etc.
3055  *                      op_fid1 is the parent FID, op_fid2 is the child
3056  *                      FID.
3057  * \param[out] request  point to the request of unlink.
3058  *
3059  * retval               0 if succeed
3060  *                      negative errno if failed.
3061  */
3062 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
3063                       struct ptlrpc_request **request)
3064 {
3065         struct obd_device *obd = exp->exp_obd;
3066         struct lmv_obd *lmv = &obd->u.lmv;
3067         struct lmv_tgt_desc *tgt;
3068         struct lmv_tgt_desc *parent_tgt;
3069         struct mdt_body *body;
3070         int rc;
3071
3072         ENTRY;
3073
3074         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
3075         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
3076         op_data->op_cap = current_cap();
3077
3078 retry:
3079         parent_tgt = lmv_locate_tgt(lmv, op_data);
3080         if (IS_ERR(parent_tgt))
3081                 RETURN(PTR_ERR(parent_tgt));
3082
3083         if (likely(!fid_is_zero(&op_data->op_fid2))) {
3084                 tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
3085                 if (IS_ERR(tgt))
3086                         RETURN(PTR_ERR(tgt));
3087         } else {
3088                 tgt = parent_tgt;
3089         }
3090
3091         /*
3092          * If child's fid is given, cancel unused locks for it if it is from
3093          * another export than parent.
3094          *
3095          * LOOKUP lock for child (fid3) should also be cancelled on parent
3096          * tgt_tgt in mdc_unlink().
3097          */
3098         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
3099
3100         if (parent_tgt != tgt)
3101                 rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_index,
3102                                       LCK_EX, MDS_INODELOCK_LOOKUP,
3103                                       MF_MDC_CANCEL_FID3);
3104
3105         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_index, LCK_EX,
3106                               MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3);
3107         if (rc)
3108                 RETURN(rc);
3109
3110         CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%u\n",
3111                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2),
3112                tgt->ltd_index);
3113
3114         rc = md_unlink(tgt->ltd_exp, op_data, request);
3115         if (rc == -ENOENT && lmv_dir_retry_check_update(op_data)) {
3116                 ptlrpc_req_finished(*request);
3117                 *request = NULL;
3118                 goto retry;
3119         }
3120
3121         if (rc != -EREMOTE)
3122                 RETURN(rc);
3123
3124         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
3125         if (body == NULL)
3126                 RETURN(-EPROTO);
3127
3128         /* Not cross-ref case, just get out of here. */
3129         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
3130                 RETURN(rc);
3131
3132         /* This is a remote object, try remote MDT. */
3133         op_data->op_fid2 = body->mbo_fid1;
3134         ptlrpc_req_finished(*request);
3135         *request = NULL;
3136
3137         tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
3138         if (IS_ERR(tgt))
3139                 RETURN(PTR_ERR(tgt));
3140
3141         goto retry;
3142 }
3143
3144 static int lmv_precleanup(struct obd_device *obd)
3145 {
3146         ENTRY;
3147         libcfs_kkuc_group_rem(&obd->obd_uuid, 0, KUC_GRP_HSM);
3148         RETURN(0);
3149 }
3150
3151 /**
3152  * Get by key a value associated with a LMV device.
3153  *
3154  * Dispatch request to lower-layer devices as needed.
3155  *
3156  * \param[in] env               execution environment for this thread
3157  * \param[in] exp               export for the LMV device
3158  * \param[in] keylen            length of key identifier
3159  * \param[in] key               identifier of key to get value for
3160  * \param[in] vallen            size of \a val
3161  * \param[out] val              pointer to storage location for value
3162  * \param[in] lsm               optional striping metadata of object
3163  *
3164  * \retval 0            on success
3165  * \retval negative     negated errno on failure
3166  */
3167 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
3168                         __u32 keylen, void *key, __u32 *vallen, void *val)
3169 {
3170         struct obd_device *obd;
3171         struct lmv_obd *lmv;
3172         struct lu_tgt_desc *tgt;
3173         int rc = 0;
3174
3175         ENTRY;
3176
3177         obd = class_exp2obd(exp);
3178         if (obd == NULL) {
3179                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
3180                        exp->exp_handle.h_cookie);
3181                 RETURN(-EINVAL);
3182         }
3183
3184         lmv = &obd->u.lmv;
3185         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
3186                 LASSERT(*vallen == sizeof(__u32));
3187                 lmv_foreach_connected_tgt(lmv, tgt) {
3188                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
3189                                           vallen, val))
3190                                 RETURN(0);
3191                 }
3192                 RETURN(-EINVAL);
3193         } else if (KEY_IS(KEY_MAX_EASIZE) ||
3194                    KEY_IS(KEY_DEFAULT_EASIZE) ||
3195                    KEY_IS(KEY_CONN_DATA)) {
3196                 /*
3197                  * Forwarding this request to first MDS, it should know LOV
3198                  * desc.
3199                  */
3200                 tgt = lmv_tgt(lmv, 0);
3201                 if (!tgt)
3202                         RETURN(-ENODEV);
3203
3204                 rc = obd_get_info(env, tgt->ltd_exp, keylen, key, vallen, val);
3205                 if (!rc && KEY_IS(KEY_CONN_DATA))
3206                         exp->exp_connect_data = *(struct obd_connect_data *)val;
3207                 RETURN(rc);
3208         } else if (KEY_IS(KEY_TGT_COUNT)) {
3209                 *((int *)val) = lmv->lmv_mdt_descs.ltd_tgts_size;
3210                 RETURN(0);
3211         }
3212
3213         CDEBUG(D_IOCTL, "Invalid key\n");
3214         RETURN(-EINVAL);
3215 }
3216
3217 static int lmv_rmfid(struct obd_export *exp, struct fid_array *fa,
3218                      int *__rcs, struct ptlrpc_request_set *_set)
3219 {
3220         struct obd_device *obd = class_exp2obd(exp);
3221         struct ptlrpc_request_set *set = _set;
3222         struct lmv_obd *lmv = &obd->u.lmv;
3223         int tgt_count = lmv->lmv_mdt_count;
3224         struct lu_tgt_desc *tgt;
3225         struct fid_array *fat, **fas = NULL;
3226         int i, rc, **rcs = NULL;
3227
3228         if (!set) {
3229                 set = ptlrpc_prep_set();
3230                 if (!set)
3231                         RETURN(-ENOMEM);
3232         }
3233
3234         /* split FIDs by targets */
3235         OBD_ALLOC_PTR_ARRAY(fas, tgt_count);
3236         if (fas == NULL)
3237                 GOTO(out, rc = -ENOMEM);
3238         OBD_ALLOC_PTR_ARRAY(rcs, tgt_count);
3239         if (rcs == NULL)
3240                 GOTO(out_fas, rc = -ENOMEM);
3241
3242         for (i = 0; i < fa->fa_nr; i++) {
3243                 unsigned int idx;
3244
3245                 rc = lmv_fld_lookup(lmv, &fa->fa_fids[i], &idx);
3246                 if (rc) {
3247                         CDEBUG(D_OTHER, "can't lookup "DFID": rc = %d\n",
3248                                PFID(&fa->fa_fids[i]), rc);
3249                         continue;
3250                 }
3251                 LASSERT(idx < tgt_count);
3252                 if (!fas[idx])
3253                         OBD_ALLOC(fas[idx], offsetof(struct fid_array,
3254                                   fa_fids[fa->fa_nr]));
3255                 if (!fas[idx])
3256                         GOTO(out, rc = -ENOMEM);
3257                 if (!rcs[idx])
3258                         OBD_ALLOC_PTR_ARRAY(rcs[idx], fa->fa_nr);
3259                 if (!rcs[idx])
3260                         GOTO(out, rc = -ENOMEM);
3261
3262                 fat = fas[idx];
3263                 fat->fa_fids[fat->fa_nr++] = fa->fa_fids[i];
3264         }
3265
3266         lmv_foreach_connected_tgt(lmv, tgt) {
3267                 fat = fas[tgt->ltd_index];
3268                 if (!fat || fat->fa_nr == 0)
3269                         continue;
3270                 rc = md_rmfid(tgt->ltd_exp, fat, rcs[tgt->ltd_index], set);
3271         }
3272
3273         rc = ptlrpc_set_wait(NULL, set);
3274         if (rc == 0) {
3275                 int j = 0;
3276                 for (i = 0; i < tgt_count; i++) {
3277                         fat = fas[i];
3278                         if (!fat || fat->fa_nr == 0)
3279                                 continue;
3280                         /* copy FIDs back */
3281                         memcpy(fa->fa_fids + j, fat->fa_fids,
3282                                fat->fa_nr * sizeof(struct lu_fid));
3283                         /* copy rcs back */
3284                         memcpy(__rcs + j, rcs[i], fat->fa_nr * sizeof(**rcs));
3285                         j += fat->fa_nr;
3286                 }
3287         }
3288         if (set != _set)
3289                 ptlrpc_set_destroy(set);
3290
3291 out:
3292         for (i = 0; i < tgt_count; i++) {
3293                 if (fas && fas[i])
3294                         OBD_FREE(fas[i], offsetof(struct fid_array,
3295                                                 fa_fids[fa->fa_nr]));
3296                 if (rcs && rcs[i])
3297                         OBD_FREE_PTR_ARRAY(rcs[i], fa->fa_nr);
3298         }
3299         if (rcs)
3300                 OBD_FREE_PTR_ARRAY(rcs, tgt_count);
3301 out_fas:
3302         if (fas)
3303                 OBD_FREE_PTR_ARRAY(fas, tgt_count);
3304
3305         RETURN(rc);
3306 }
3307
3308 /**
3309  * Asynchronously set by key a value associated with a LMV device.
3310  *
3311  * Dispatch request to lower-layer devices as needed.
3312  *
3313  * \param[in] env       execution environment for this thread
3314  * \param[in] exp       export for the LMV device
3315  * \param[in] keylen    length of key identifier
3316  * \param[in] key       identifier of key to store value for
3317  * \param[in] vallen    size of value to store
3318  * \param[in] val       pointer to data to be stored
3319  * \param[in] set       optional list of related ptlrpc requests
3320  *
3321  * \retval 0            on success
3322  * \retval negative     negated errno on failure
3323  */
3324 static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
3325                               __u32 keylen, void *key, __u32 vallen, void *val,
3326                               struct ptlrpc_request_set *set)
3327 {
3328         struct lmv_tgt_desc *tgt;
3329         struct obd_device *obd;
3330         struct lmv_obd *lmv;
3331         int rc = 0;
3332         ENTRY;
3333
3334         obd = class_exp2obd(exp);
3335         if (obd == NULL) {
3336                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
3337                        exp->exp_handle.h_cookie);
3338                 RETURN(-EINVAL);
3339         }
3340         lmv = &obd->u.lmv;
3341
3342         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) ||
3343             KEY_IS(KEY_DEFAULT_EASIZE)) {
3344                 int err = 0;
3345
3346                 lmv_foreach_connected_tgt(lmv, tgt) {
3347                         err = obd_set_info_async(env, tgt->ltd_exp,
3348                                                  keylen, key, vallen, val, set);
3349                         if (err && rc == 0)
3350                                 rc = err;
3351                 }
3352
3353                 RETURN(rc);
3354         }
3355
3356         RETURN(-EINVAL);
3357 }
3358
3359 static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
3360                             const struct lmv_mds_md_v1 *lmm1)
3361 {
3362         struct lmv_obd  *lmv = &exp->exp_obd->u.lmv;
3363         int             stripe_count;
3364         int             cplen;
3365         int             i;
3366         int             rc = 0;
3367         ENTRY;
3368
3369         lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic);
3370         lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
3371         lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index);
3372         if (CFS_FAIL_CHECK(OBD_FAIL_LMV_UNKNOWN_STRIPE))
3373                 lsm->lsm_md_hash_type = cfs_fail_val ?: LMV_HASH_TYPE_UNKNOWN;
3374         else
3375                 lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
3376         lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
3377         lsm->lsm_md_migrate_offset = le32_to_cpu(lmm1->lmv_migrate_offset);
3378         lsm->lsm_md_migrate_hash = le32_to_cpu(lmm1->lmv_migrate_hash);
3379         cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
3380                         sizeof(lsm->lsm_md_pool_name));
3381
3382         if (cplen >= sizeof(lsm->lsm_md_pool_name))
3383                 RETURN(-E2BIG);
3384
3385         CDEBUG(D_INFO, "unpack lsm count %d/%d, master %d hash_type %#x/%#x "
3386                "layout_version %d\n", lsm->lsm_md_stripe_count,
3387                lsm->lsm_md_migrate_offset, lsm->lsm_md_master_mdt_index,
3388                lsm->lsm_md_hash_type, lsm->lsm_md_migrate_hash,
3389                lsm->lsm_md_layout_version);
3390
3391         stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
3392         for (i = 0; i < stripe_count; i++) {
3393                 fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid,
3394                               &lmm1->lmv_stripe_fids[i]);
3395                 /*
3396                  * set default value -1, so lmv_locate_tgt() knows this stripe
3397                  * target is not initialized.
3398                  */
3399                 lsm->lsm_md_oinfo[i].lmo_mds = LMV_OFFSET_DEFAULT;
3400                 if (!fid_is_sane(&lsm->lsm_md_oinfo[i].lmo_fid))
3401                         continue;
3402
3403                 rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid,
3404                                     &lsm->lsm_md_oinfo[i].lmo_mds);
3405                 if (rc == -ENOENT)
3406                         continue;
3407
3408                 if (rc)
3409                         RETURN(rc);
3410
3411                 CDEBUG(D_INFO, "unpack fid #%d "DFID"\n", i,
3412                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid));
3413         }
3414
3415         RETURN(rc);
3416 }
3417
3418 static inline int lmv_unpack_user_md(struct obd_export *exp,
3419                                      struct lmv_stripe_md *lsm,
3420                                      const struct lmv_user_md *lmu)
3421 {
3422         lsm->lsm_md_magic = le32_to_cpu(lmu->lum_magic);
3423         lsm->lsm_md_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
3424         lsm->lsm_md_master_mdt_index = le32_to_cpu(lmu->lum_stripe_offset);
3425         lsm->lsm_md_hash_type = le32_to_cpu(lmu->lum_hash_type);
3426         lsm->lsm_md_max_inherit = lmu->lum_max_inherit;
3427         lsm->lsm_md_max_inherit_rr = lmu->lum_max_inherit_rr;
3428         lsm->lsm_md_pool_name[LOV_MAXPOOLNAME] = 0;
3429
3430         return 0;
3431 }
3432
3433 static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp,
3434                         const union lmv_mds_md *lmm, size_t lmm_size)
3435 {
3436         struct lmv_stripe_md     *lsm;
3437         int                      lsm_size;
3438         int                      rc;
3439         bool                     allocated = false;
3440         ENTRY;
3441
3442         LASSERT(lsmp != NULL);
3443
3444         lsm = *lsmp;
3445         /* Free memmd */
3446         if (lsm != NULL && lmm == NULL) {
3447                 int i;
3448                 struct lmv_foreign_md *lfm = (struct lmv_foreign_md *)lsm;
3449
3450                 if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) {
3451                         size_t lfm_size;
3452
3453                         lfm_size = lfm->lfm_length + offsetof(typeof(*lfm),
3454                                                               lfm_value[0]);
3455                         OBD_FREE_LARGE(lfm, lfm_size);
3456                         RETURN(0);
3457                 }
3458
3459                 if (lmv_dir_striped(lsm)) {
3460                         for (i = 0; i < lsm->lsm_md_stripe_count; i++)
3461                                 iput(lsm->lsm_md_oinfo[i].lmo_root);
3462                         lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
3463                 } else {
3464                         lsm_size = lmv_stripe_md_size(0);
3465                 }
3466                 OBD_FREE(lsm, lsm_size);
3467                 *lsmp = NULL;
3468                 RETURN(0);
3469         }
3470
3471         /* foreign lmv case */
3472         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_FOREIGN) {
3473                 struct lmv_foreign_md *lfm = (struct lmv_foreign_md *)lsm;
3474
3475                 if (lfm == NULL) {
3476                         OBD_ALLOC_LARGE(lfm, lmm_size);
3477                         if (lfm == NULL)
3478                                 RETURN(-ENOMEM);
3479                         *lsmp = (struct lmv_stripe_md *)lfm;
3480                 }
3481                 lfm->lfm_magic = le32_to_cpu(lmm->lmv_foreign_md.lfm_magic);
3482                 lfm->lfm_length = le32_to_cpu(lmm->lmv_foreign_md.lfm_length);
3483                 lfm->lfm_type = le32_to_cpu(lmm->lmv_foreign_md.lfm_type);
3484                 lfm->lfm_flags = le32_to_cpu(lmm->lmv_foreign_md.lfm_flags);
3485                 memcpy(&lfm->lfm_value, &lmm->lmv_foreign_md.lfm_value,
3486                        lfm->lfm_length);
3487                 RETURN(lmm_size);
3488         }
3489
3490         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE)
3491                 RETURN(-EPERM);
3492
3493         /* Unpack memmd */
3494         if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
3495             le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
3496                 CERROR("%s: invalid lmv magic %x: rc = %d\n",
3497                        exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic),
3498                        -EIO);
3499                 RETURN(-EIO);
3500         }
3501
3502         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1)
3503                 lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
3504         else
3505                 /**
3506                  * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md,
3507                  * stripecount should be 0 then.
3508                  */
3509                 lsm_size = lmv_stripe_md_size(0);
3510
3511         if (lsm == NULL) {
3512                 OBD_ALLOC(lsm, lsm_size);
3513                 if (lsm == NULL)
3514                         RETURN(-ENOMEM);
3515                 allocated = true;
3516                 *lsmp = lsm;
3517         }
3518
3519         switch (le32_to_cpu(lmm->lmv_magic)) {
3520         case LMV_MAGIC_V1:
3521                 rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
3522                 break;
3523         case LMV_USER_MAGIC:
3524                 rc = lmv_unpack_user_md(exp, lsm, &lmm->lmv_user_md);
3525                 break;
3526         default:
3527                 CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name,
3528                        le32_to_cpu(lmm->lmv_magic));
3529                 rc = -EINVAL;
3530                 break;
3531         }
3532
3533         if (rc != 0 && allocated) {
3534                 OBD_FREE(lsm, lsm_size);
3535                 *lsmp = NULL;
3536                 lsm_size = rc;
3537         }
3538         RETURN(lsm_size);
3539 }
3540
3541 void lmv_free_memmd(struct lmv_stripe_md *lsm)
3542 {
3543         lmv_unpackmd(NULL, &lsm, NULL, 0);
3544 }
3545 EXPORT_SYMBOL(lmv_free_memmd);
3546
3547 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
3548                              union ldlm_policy_data *policy,
3549                              enum ldlm_mode mode, enum ldlm_cancel_flags flags,
3550                              void *opaque)
3551 {
3552         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
3553         struct lu_tgt_desc *tgt;
3554         int err;
3555         int rc = 0;
3556
3557         ENTRY;
3558
3559         LASSERT(fid != NULL);
3560
3561         lmv_foreach_connected_tgt(lmv, tgt) {
3562                 if (!tgt->ltd_active)
3563                         continue;
3564
3565                 err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags,
3566                                        opaque);
3567                 if (!rc)
3568                         rc = err;
3569         }
3570         RETURN(rc);
3571 }
3572
3573 static int lmv_set_lock_data(struct obd_export *exp,
3574                              const struct lustre_handle *lockh,
3575                              void *data, __u64 *bits)
3576 {
3577         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
3578         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
3579         int rc;
3580
3581         ENTRY;
3582
3583         if (tgt == NULL || tgt->ltd_exp == NULL)
3584                 RETURN(-EINVAL);
3585         rc =  md_set_lock_data(tgt->ltd_exp, lockh, data, bits);
3586         RETURN(rc);
3587 }
3588
3589 static enum ldlm_mode
3590 lmv_lock_match(struct obd_export *exp, __u64 flags,
3591                const struct lu_fid *fid, enum ldlm_type type,
3592                union ldlm_policy_data *policy,
3593                enum ldlm_mode mode, struct lustre_handle *lockh)
3594 {
3595         struct obd_device *obd = exp->exp_obd;
3596         struct lmv_obd *lmv = &obd->u.lmv;
3597         enum ldlm_mode rc;
3598         struct lu_tgt_desc *tgt;
3599         int i;
3600         int index;
3601
3602         ENTRY;
3603
3604         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
3605
3606         /*
3607          * With DNE every object can have two locks in different namespaces:
3608          * lookup lock in space of MDT storing direntry and update/open lock in
3609          * space of MDT storing inode.  Try the MDT that the FID maps to first,
3610          * since this can be easily found, and only try others if that fails.
3611          */
3612         for (i = 0, index = lmv_fid2tgt_index(lmv, fid);
3613              i < lmv->lmv_mdt_descs.ltd_tgts_size;
3614              i++, index = (index + 1) % lmv->lmv_mdt_descs.ltd_tgts_size) {
3615                 if (index < 0) {
3616                         CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n",
3617                                obd->obd_name, PFID(fid), index);
3618                         index = 0;
3619                 }
3620
3621                 tgt = lmv_tgt(lmv, index);
3622                 if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
3623                         continue;
3624
3625                 rc = md_lock_match(tgt->ltd_exp, flags, fid, type, policy, mode,
3626                                    lockh);
3627                 if (rc)
3628                         RETURN(rc);
3629         }
3630
3631         RETURN(0);
3632 }
3633
3634 static int
3635 lmv_get_lustre_md(struct obd_export *exp, struct req_capsule *pill,
3636                   struct obd_export *dt_exp, struct obd_export *md_exp,
3637                   struct lustre_md *md)
3638 {
3639         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
3640         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
3641
3642         if (!tgt || !tgt->ltd_exp)
3643                 return -EINVAL;
3644
3645         return md_get_lustre_md(tgt->ltd_exp, pill, dt_exp, md_exp, md);
3646 }
3647
3648 static int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
3649 {
3650         struct obd_device *obd = exp->exp_obd;
3651         struct lmv_obd *lmv = &obd->u.lmv;
3652         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
3653
3654         ENTRY;
3655
3656         if (md->default_lmv) {
3657                 lmv_free_memmd(md->default_lmv);
3658                 md->default_lmv = NULL;
3659         }
3660         if (md->lmv != NULL) {
3661                 lmv_free_memmd(md->lmv);
3662                 md->lmv = NULL;
3663         }
3664         if (!tgt || !tgt->ltd_exp)
3665                 RETURN(-EINVAL);
3666         RETURN(md_free_lustre_md(tgt->ltd_exp, md));
3667 }
3668
3669 static int lmv_set_open_replay_data(struct obd_export *exp,
3670                                     struct obd_client_handle *och,
3671                                     struct lookup_intent *it)
3672 {
3673         struct obd_device *obd = exp->exp_obd;
3674         struct lmv_obd *lmv = &obd->u.lmv;
3675         struct lmv_tgt_desc *tgt;
3676
3677         ENTRY;
3678
3679         tgt = lmv_fid2tgt(lmv, &och->och_fid);
3680         if (IS_ERR(tgt))
3681                 RETURN(PTR_ERR(tgt));
3682
3683         RETURN(md_set_open_replay_data(tgt->ltd_exp, och, it));
3684 }
3685
3686 static int lmv_clear_open_replay_data(struct obd_export *exp,
3687                                       struct obd_client_handle *och)
3688 {
3689         struct obd_device *obd = exp->exp_obd;
3690         struct lmv_obd *lmv = &obd->u.lmv;
3691         struct lmv_tgt_desc *tgt;
3692
3693         ENTRY;
3694
3695         tgt = lmv_fid2tgt(lmv, &och->och_fid);
3696         if (IS_ERR(tgt))
3697                 RETURN(PTR_ERR(tgt));
3698
3699         RETURN(md_clear_open_replay_data(tgt->ltd_exp, och));
3700 }
3701
3702 static int lmv_intent_getattr_async(struct obd_export *exp,
3703                                     struct md_op_item *item)
3704 {
3705         struct md_op_data *op_data = &item->mop_data;
3706         struct obd_device *obd = exp->exp_obd;
3707         struct lmv_obd *lmv = &obd->u.lmv;
3708         struct lmv_tgt_desc *ptgt;
3709         struct lmv_tgt_desc *ctgt;
3710         int rc;
3711
3712         ENTRY;
3713
3714         if (!fid_is_sane(&op_data->op_fid2))
3715                 RETURN(-EINVAL);
3716
3717         ptgt = lmv_locate_tgt(lmv, op_data);
3718         if (IS_ERR(ptgt))
3719                 RETURN(PTR_ERR(ptgt));
3720
3721         ctgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
3722         if (IS_ERR(ctgt))
3723                 RETURN(PTR_ERR(ctgt));
3724
3725         /*
3726          * remote object needs two RPCs to lookup and getattr, considering the
3727          * complexity don't support statahead for now.
3728          */
3729         if (ctgt != ptgt)
3730                 RETURN(-EREMOTE);
3731
3732         rc = md_intent_getattr_async(ptgt->ltd_exp, item);
3733
3734         RETURN(rc);
3735 }
3736
3737 static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
3738                                struct lu_fid *fid, __u64 *bits)
3739 {
3740         struct obd_device *obd = exp->exp_obd;
3741         struct lmv_obd *lmv = &obd->u.lmv;
3742         struct lmv_tgt_desc *tgt;
3743         int rc;
3744
3745         ENTRY;
3746
3747         tgt = lmv_fid2tgt(lmv, fid);
3748         if (IS_ERR(tgt))
3749                 RETURN(PTR_ERR(tgt));
3750
3751         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
3752         RETURN(rc);
3753 }
3754
3755 static int lmv_get_fid_from_lsm(struct obd_export *exp,
3756                                 const struct lmv_stripe_md *lsm,
3757                                 const char *name, int namelen,
3758                                 struct lu_fid *fid)
3759 {
3760         const struct lmv_oinfo *oinfo;
3761
3762         if (!lmv_dir_striped(lsm))
3763                 RETURN(-ESTALE);
3764
3765         oinfo = lsm_name_to_stripe_info(lsm, name, namelen, false);
3766         if (IS_ERR(oinfo))
3767                 return PTR_ERR(oinfo);
3768
3769         *fid = oinfo->lmo_fid;
3770
3771         RETURN(0);
3772 }
3773
3774 /**
3775  * For lmv, only need to send request to master MDT, and the master MDT will
3776  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
3777  * we directly fetch data from the slave MDTs.
3778  */
3779 static int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
3780                         struct obd_quotactl *oqctl)
3781 {
3782         struct obd_device *obd = class_exp2obd(exp);
3783         struct lmv_obd *lmv = &obd->u.lmv;
3784         struct lmv_tgt_desc *tgt = lmv_tgt(lmv, 0);
3785         __u64 curspace, curinodes;
3786         int rc = 0;
3787
3788         ENTRY;
3789
3790         if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) {
3791                 CERROR("master lmv inactive\n");
3792                 RETURN(-EIO);
3793         }
3794
3795         if (oqctl->qc_cmd != Q_GETOQUOTA) {
3796                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
3797                 RETURN(rc);
3798         }
3799
3800         curspace = curinodes = 0;
3801         lmv_foreach_connected_tgt(lmv, tgt) {
3802                 int err;
3803
3804                 if (!tgt->ltd_active)
3805                         continue;
3806
3807                 err = obd_quotactl(tgt->ltd_exp, oqctl);
3808                 if (err) {
3809                         CERROR("getquota on mdt %d failed. %d\n",
3810                                tgt->ltd_index, err);
3811                         if (!rc)
3812                                 rc = err;
3813                 } else {
3814                         curspace += oqctl->qc_dqblk.dqb_curspace;
3815                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
3816                 }
3817         }
3818         oqctl->qc_dqblk.dqb_curspace = curspace;
3819         oqctl->qc_dqblk.dqb_curinodes = curinodes;
3820
3821         RETURN(rc);
3822 }
3823
3824 static int lmv_merge_attr(struct obd_export *exp,
3825                           const struct lmv_stripe_md *lsm,
3826                           struct cl_attr *attr,
3827                           ldlm_blocking_callback cb_blocking)
3828 {
3829         int rc;
3830         int i;
3831
3832         if (!lmv_dir_striped(lsm))
3833                 return 0;
3834
3835         rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0);
3836         if (rc < 0)
3837                 return rc;
3838
3839         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
3840                 struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
3841
3842                 if (!inode)
3843                         continue;
3844
3845                 CDEBUG(D_INFO,
3846                        "" DFID " size %llu, blocks %llu nlink %u, atime %lld ctime %lld, mtime %lld.\n",
3847                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
3848                        i_size_read(inode), (unsigned long long)inode->i_blocks,
3849                        inode->i_nlink, (s64)inode->i_atime.tv_sec,
3850                        (s64)inode->i_ctime.tv_sec, (s64)inode->i_mtime.tv_sec);
3851
3852                 /* for slave stripe, it needs to subtract nlink for . and .. */
3853                 if (i != 0)
3854                         attr->cat_nlink += inode->i_nlink - 2;
3855                 else
3856                         attr->cat_nlink = inode->i_nlink;
3857
3858                 attr->cat_size += i_size_read(inode);
3859                 attr->cat_blocks += inode->i_blocks;
3860
3861                 if (attr->cat_atime < inode->i_atime.tv_sec)
3862                         attr->cat_atime = inode->i_atime.tv_sec;
3863
3864                 if (attr->cat_ctime < inode->i_ctime.tv_sec)
3865                         attr->cat_ctime = inode->i_ctime.tv_sec;
3866
3867                 if (attr->cat_mtime < inode->i_mtime.tv_sec)
3868                         attr->cat_mtime = inode->i_mtime.tv_sec;
3869         }
3870         return 0;
3871 }
3872
3873 static const struct obd_ops lmv_obd_ops = {
3874         .o_owner                = THIS_MODULE,
3875         .o_setup                = lmv_setup,
3876         .o_cleanup              = lmv_cleanup,
3877         .o_precleanup           = lmv_precleanup,
3878         .o_process_config       = lmv_process_config,
3879         .o_connect              = lmv_connect,
3880         .o_disconnect           = lmv_disconnect,
3881         .o_statfs               = lmv_statfs,
3882         .o_get_info             = lmv_get_info,
3883         .o_set_info_async       = lmv_set_info_async,
3884         .o_notify               = lmv_notify,
3885         .o_get_uuid             = lmv_get_uuid,
3886         .o_fid_alloc            = lmv_fid_alloc,
3887         .o_iocontrol            = lmv_iocontrol,
3888         .o_quotactl             = lmv_quotactl
3889 };
3890
3891 static const struct md_ops lmv_md_ops = {
3892         .m_get_root             = lmv_get_root,
3893         .m_null_inode           = lmv_null_inode,
3894         .m_close                = lmv_close,
3895         .m_create               = lmv_create,
3896         .m_enqueue              = lmv_enqueue,
3897         .m_getattr              = lmv_getattr,
3898         .m_getxattr             = lmv_getxattr,
3899         .m_getattr_name         = lmv_getattr_name,
3900         .m_intent_lock          = lmv_intent_lock,
3901         .m_link                 = lmv_link,
3902         .m_rename               = lmv_rename,
3903         .m_setattr              = lmv_setattr,
3904         .m_setxattr             = lmv_setxattr,
3905         .m_fsync                = lmv_fsync,
3906         .m_file_resync          = lmv_file_resync,
3907         .m_read_page            = lmv_read_page,
3908         .m_unlink               = lmv_unlink,
3909         .m_init_ea_size         = lmv_init_ea_size,
3910         .m_cancel_unused        = lmv_cancel_unused,
3911         .m_set_lock_data        = lmv_set_lock_data,
3912         .m_lock_match           = lmv_lock_match,
3913         .m_get_lustre_md        = lmv_get_lustre_md,
3914         .m_free_lustre_md       = lmv_free_lustre_md,
3915         .m_merge_attr           = lmv_merge_attr,
3916         .m_set_open_replay_data = lmv_set_open_replay_data,
3917         .m_clear_open_replay_data = lmv_clear_open_replay_data,
3918         .m_intent_getattr_async = lmv_intent_getattr_async,
3919         .m_revalidate_lock      = lmv_revalidate_lock,
3920         .m_get_fid_from_lsm     = lmv_get_fid_from_lsm,
3921         .m_unpackmd             = lmv_unpackmd,
3922         .m_rmfid                = lmv_rmfid,
3923 };
3924
3925 static int __init lmv_init(void)
3926 {
3927         return class_register_type(&lmv_obd_ops, &lmv_md_ops, true,
3928                                    LUSTRE_LMV_NAME, NULL);
3929 }
3930
3931 static void __exit lmv_exit(void)
3932 {
3933         class_unregister_type(LUSTRE_LMV_NAME);
3934 }
3935
3936 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3937 MODULE_DESCRIPTION("Lustre Logical Metadata Volume");
3938 MODULE_VERSION(LUSTRE_VERSION_STRING);
3939 MODULE_LICENSE("GPL");
3940
3941 module_init(lmv_init);
3942 module_exit(lmv_exit);