Whamcloud - gitweb
LU-7669 lmv: assume a real connection in lmv_connect()
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #include <linux/slab.h>
39 #include <linux/module.h>
40 #include <linux/init.h>
41 #include <linux/user_namespace.h>
42 #ifdef HAVE_UIDGID_HEADER
43 # include <linux/uidgid.h>
44 #endif
45 #include <linux/slab.h>
46 #include <linux/pagemap.h>
47 #include <linux/mm.h>
48 #include <linux/math64.h>
49 #include <linux/seq_file.h>
50 #include <linux/namei.h>
51
52 #include <lustre/lustre_idl.h>
53 #include <obd_support.h>
54 #include <lustre_lib.h>
55 #include <lustre_net.h>
56 #include <obd_class.h>
57 #include <lustre_lmv.h>
58 #include <lprocfs_status.h>
59 #include <cl_object.h>
60 #include <lustre_fid.h>
61 #include <lustre_ioctl.h>
62 #include <lustre_kernelcomm.h>
63 #include "lmv_internal.h"
64
65 static int lmv_check_connect(struct obd_device *obd);
66
67 static void lmv_activate_target(struct lmv_obd *lmv,
68                                 struct lmv_tgt_desc *tgt,
69                                 int activate)
70 {
71         if (tgt->ltd_active == activate)
72                 return;
73
74         tgt->ltd_active = activate;
75         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
76
77         tgt->ltd_exp->exp_obd->obd_inactive = !activate;
78 }
79
80 /**
81  * Error codes:
82  *
83  *  -EINVAL  : UUID can't be found in the LMV's target list
84  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
85  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
86  */
87 static int lmv_set_mdc_active(struct lmv_obd *lmv,
88                               const struct obd_uuid *uuid,
89                               int activate)
90 {
91         struct lmv_tgt_desc     *tgt = NULL;
92         struct obd_device       *obd;
93         __u32                    i;
94         int                      rc = 0;
95         ENTRY;
96
97         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
98                         lmv, uuid->uuid, activate);
99
100         spin_lock(&lmv->lmv_lock);
101         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
102                 tgt = lmv->tgts[i];
103                 if (tgt == NULL || tgt->ltd_exp == NULL)
104                         continue;
105
106                 CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n", i,
107                        tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
108
109                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
110                         break;
111         }
112
113         if (i == lmv->desc.ld_tgt_count)
114                 GOTO(out_lmv_lock, rc = -EINVAL);
115
116         obd = class_exp2obd(tgt->ltd_exp);
117         if (obd == NULL)
118                 GOTO(out_lmv_lock, rc = -ENOTCONN);
119
120         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
121                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
122                obd->obd_type->typ_name, i);
123         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
124
125         if (tgt->ltd_active == activate) {
126                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
127                        activate ? "" : "in");
128                 GOTO(out_lmv_lock, rc);
129         }
130
131         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
132                activate ? "" : "in");
133         lmv_activate_target(lmv, tgt, activate);
134         EXIT;
135
136  out_lmv_lock:
137         spin_unlock(&lmv->lmv_lock);
138         return rc;
139 }
140
141 struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
142 {
143         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
144         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
145
146         return (tgt == NULL) ? NULL : obd_get_uuid(tgt->ltd_exp);
147 }
148
149 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
150                       enum obd_notify_event ev, void *data)
151 {
152         struct obd_connect_data *conn_data;
153         struct lmv_obd          *lmv = &obd->u.lmv;
154         struct obd_uuid         *uuid;
155         int                      rc = 0;
156         ENTRY;
157
158         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
159                 CERROR("unexpected notification of %s %s!\n",
160                        watched->obd_type->typ_name,
161                        watched->obd_name);
162                 RETURN(-EINVAL);
163         }
164
165         uuid = &watched->u.cli.cl_target_uuid;
166         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
167                 /*
168                  * Set MDC as active before notifying the observer, so the
169                  * observer can use the MDC normally.
170                  */
171                 rc = lmv_set_mdc_active(lmv, uuid,
172                                         ev == OBD_NOTIFY_ACTIVE);
173                 if (rc) {
174                         CERROR("%sactivation of %s failed: %d\n",
175                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
176                                uuid->uuid, rc);
177                         RETURN(rc);
178                 }
179         } else if (ev == OBD_NOTIFY_OCD) {
180                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
181                 /*
182                  * XXX: Make sure that ocd_connect_flags from all targets are
183                  * the same. Otherwise one of MDTs runs wrong version or
184                  * something like this.  --umka
185                  */
186                 obd->obd_self_export->exp_connect_data = *conn_data;
187         }
188
189         /*
190          * Pass the notification up the chain.
191          */
192         if (obd->obd_observer)
193                 rc = obd_notify(obd->obd_observer, watched, ev, data);
194
195         RETURN(rc);
196 }
197
198 static int lmv_connect(const struct lu_env *env,
199                        struct obd_export **pexp, struct obd_device *obd,
200                        struct obd_uuid *cluuid, struct obd_connect_data *data,
201                        void *localdata)
202 {
203         struct lmv_obd *lmv = &obd->u.lmv;
204         struct lustre_handle conn = { 0 };
205         struct obd_export *exp;
206         int rc;
207         ENTRY;
208
209         rc = class_connect(&conn, obd, cluuid);
210         if (rc) {
211                 CERROR("class_connection() returned %d\n", rc);
212                 RETURN(rc);
213         }
214
215         exp = class_conn2export(&conn);
216
217         lmv->connected = 0;
218         lmv->cluuid = *cluuid;
219         lmv->conn_data = *data;
220
221         if (lmv->targets_proc_entry == NULL) {
222                 lmv->targets_proc_entry = lprocfs_register("target_obds",
223                                                            obd->obd_proc_entry,
224                                                            NULL, NULL);
225                 if (IS_ERR(lmv->targets_proc_entry)) {
226                         CERROR("%s: cannot register "
227                                "/proc/fs/lustre/%s/%s/target_obds\n",
228                                obd->obd_name, obd->obd_type->typ_name,
229                                obd->obd_name);
230                         lmv->targets_proc_entry = NULL;
231                 }
232         }
233
234         rc = lmv_check_connect(obd);
235         if (rc != 0)
236                 GOTO(out_proc, rc);
237
238         *pexp = exp;
239
240         RETURN(rc);
241
242 out_proc:
243         if (lmv->targets_proc_entry != NULL)
244                 lprocfs_remove(&lmv->targets_proc_entry);
245
246         class_disconnect(exp);
247
248         return rc;
249 }
250
251 static int lmv_init_ea_size(struct obd_export *exp, __u32 easize,
252                             __u32 def_easize)
253 {
254         struct obd_device       *obd = exp->exp_obd;
255         struct lmv_obd          *lmv = &obd->u.lmv;
256         __u32                    i;
257         int                      rc = 0;
258         int                      change = 0;
259         ENTRY;
260
261         if (lmv->max_easize < easize) {
262                 lmv->max_easize = easize;
263                 change = 1;
264         }
265         if (lmv->max_def_easize < def_easize) {
266                 lmv->max_def_easize = def_easize;
267                 change = 1;
268         }
269
270         if (change == 0)
271                 RETURN(0);
272
273         if (lmv->connected == 0)
274                 RETURN(0);
275
276         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
277                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
278
279                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
280                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
281                         continue;
282                 }
283
284                 rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize);
285                 if (rc) {
286                         CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
287                                " rc = %d\n", obd->obd_name, i, rc);
288                         break;
289                 }
290         }
291         RETURN(rc);
292 }
293
294 #define MAX_STRING_SIZE 128
295
296 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
297 {
298         struct lmv_obd          *lmv = &obd->u.lmv;
299         struct obd_uuid         *cluuid = &lmv->cluuid;
300         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
301         struct obd_device       *mdc_obd;
302         struct obd_export       *mdc_exp;
303         struct lu_fld_target     target;
304         int                      rc;
305         ENTRY;
306
307         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
308                                         &obd->obd_uuid);
309         if (!mdc_obd) {
310                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
311                 RETURN(-EINVAL);
312         }
313
314         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
315                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
316                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
317                 cluuid->uuid);
318
319         if (!mdc_obd->obd_set_up) {
320                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
321                 RETURN(-EINVAL);
322         }
323
324         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
325                          &lmv->conn_data, NULL);
326         if (rc) {
327                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
328                 RETURN(rc);
329         }
330
331         /*
332          * Init fid sequence client for this mdc and add new fld target.
333          */
334         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
335         if (rc)
336                 RETURN(rc);
337
338         target.ft_srv = NULL;
339         target.ft_exp = mdc_exp;
340         target.ft_idx = tgt->ltd_idx;
341
342         fld_client_add_target(&lmv->lmv_fld, &target);
343
344         rc = obd_register_observer(mdc_obd, obd);
345         if (rc) {
346                 obd_disconnect(mdc_exp);
347                 CERROR("target %s register_observer error %d\n",
348                        tgt->ltd_uuid.uuid, rc);
349                 RETURN(rc);
350         }
351
352         if (obd->obd_observer) {
353                 /*
354                  * Tell the observer about the new target.
355                  */
356                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
357                                 OBD_NOTIFY_ACTIVE,
358                                 (void *)(tgt - lmv->tgts[0]));
359                 if (rc) {
360                         obd_disconnect(mdc_exp);
361                         RETURN(rc);
362                 }
363         }
364
365         tgt->ltd_active = 1;
366         tgt->ltd_exp = mdc_exp;
367         lmv->desc.ld_active_tgt_count++;
368
369         md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize);
370
371         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
372                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
373                 atomic_read(&obd->obd_refcount));
374
375         if (lmv->targets_proc_entry != NULL) {
376                 struct proc_dir_entry *mdc_symlink;
377
378                 LASSERT(mdc_obd->obd_type != NULL);
379                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
380                 mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
381                                                   lmv->targets_proc_entry,
382                                                   "../../../%s/%s",
383                                                   mdc_obd->obd_type->typ_name,
384                                                   mdc_obd->obd_name);
385                 if (mdc_symlink == NULL) {
386                         CERROR("cannot register LMV target "
387                                "/proc/fs/lustre/%s/%s/target_obds/%s\n",
388                                obd->obd_type->typ_name, obd->obd_name,
389                                mdc_obd->obd_name);
390                 }
391         }
392         RETURN(0);
393 }
394
395 static void lmv_del_target(struct lmv_obd *lmv, int index)
396 {
397         if (lmv->tgts[index] == NULL)
398                 return;
399
400         OBD_FREE_PTR(lmv->tgts[index]);
401         lmv->tgts[index] = NULL;
402         return;
403 }
404
405 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
406                            __u32 index, int gen)
407 {
408         struct obd_device *mdc_obd;
409         struct lmv_obd      *lmv = &obd->u.lmv;
410         struct lmv_tgt_desc *tgt;
411         int                  orig_tgt_count = 0;
412         int                  rc = 0;
413         ENTRY;
414
415         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
416         mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
417                                         &obd->obd_uuid);
418         if (!mdc_obd) {
419                 CERROR("%s: Target %s not attached: rc = %d\n",
420                        obd->obd_name, uuidp->uuid, -EINVAL);
421                 RETURN(-EINVAL);
422         }
423
424         mutex_lock(&lmv->lmv_init_mutex);
425         if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
426                 tgt = lmv->tgts[index];
427                 CERROR("%s: UUID %s already assigned at LOV target index %d:"
428                        " rc = %d\n", obd->obd_name,
429                        obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
430                 mutex_unlock(&lmv->lmv_init_mutex);
431                 RETURN(-EEXIST);
432         }
433
434         if (index >= lmv->tgts_size) {
435                 /* We need to reallocate the lmv target array. */
436                 struct lmv_tgt_desc **newtgts, **old = NULL;
437                 __u32 newsize = 1;
438                 __u32 oldsize = 0;
439
440                 while (newsize < index + 1)
441                         newsize = newsize << 1;
442                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
443                 if (newtgts == NULL) {
444                         mutex_unlock(&lmv->lmv_init_mutex);
445                         RETURN(-ENOMEM);
446                 }
447
448                 if (lmv->tgts_size) {
449                         memcpy(newtgts, lmv->tgts,
450                                sizeof(*newtgts) * lmv->tgts_size);
451                         old = lmv->tgts;
452                         oldsize = lmv->tgts_size;
453                 }
454
455                 lmv->tgts = newtgts;
456                 lmv->tgts_size = newsize;
457                 smp_rmb();
458                 if (old)
459                         OBD_FREE(old, sizeof(*old) * oldsize);
460
461                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
462                        lmv->tgts_size);
463         }
464
465         OBD_ALLOC_PTR(tgt);
466         if (!tgt) {
467                 mutex_unlock(&lmv->lmv_init_mutex);
468                 RETURN(-ENOMEM);
469         }
470
471         mutex_init(&tgt->ltd_fid_mutex);
472         tgt->ltd_idx = index;
473         tgt->ltd_uuid = *uuidp;
474         tgt->ltd_active = 0;
475         lmv->tgts[index] = tgt;
476         if (index >= lmv->desc.ld_tgt_count) {
477                 orig_tgt_count = lmv->desc.ld_tgt_count;
478                 lmv->desc.ld_tgt_count = index + 1;
479         }
480
481         if (lmv->connected == 0) {
482                 /* lmv_check_connect() will connect this target. */
483                 mutex_unlock(&lmv->lmv_init_mutex);
484                 RETURN(0);
485         }
486
487         /* Otherwise let's connect it ourselves */
488         mutex_unlock(&lmv->lmv_init_mutex);
489         rc = lmv_connect_mdc(obd, tgt);
490         if (rc != 0) {
491                 spin_lock(&lmv->lmv_lock);
492                 if (lmv->desc.ld_tgt_count == index + 1)
493                         lmv->desc.ld_tgt_count = orig_tgt_count;
494                 memset(tgt, 0, sizeof(*tgt));
495                 spin_unlock(&lmv->lmv_lock);
496         } else {
497                 int easize = sizeof(struct lmv_stripe_md) +
498                         lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
499                 lmv_init_ea_size(obd->obd_self_export, easize, 0);
500         }
501
502         RETURN(rc);
503 }
504
505 static int lmv_check_connect(struct obd_device *obd)
506 {
507         struct lmv_obd          *lmv = &obd->u.lmv;
508         struct lmv_tgt_desc     *tgt;
509         __u32                    i;
510         int                      rc;
511         int                      easize;
512         ENTRY;
513
514         if (lmv->connected)
515                 RETURN(0);
516
517         mutex_lock(&lmv->lmv_init_mutex);
518         if (lmv->connected) {
519                 mutex_unlock(&lmv->lmv_init_mutex);
520                 RETURN(0);
521         }
522
523         if (lmv->desc.ld_tgt_count == 0) {
524                 mutex_unlock(&lmv->lmv_init_mutex);
525                 CERROR("%s: no targets configured.\n", obd->obd_name);
526                 RETURN(-EINVAL);
527         }
528
529         LASSERT(lmv->tgts != NULL);
530
531         if (lmv->tgts[0] == NULL) {
532                 mutex_unlock(&lmv->lmv_init_mutex);
533                 CERROR("%s: no target configured for index 0.\n",
534                        obd->obd_name);
535                 RETURN(-EINVAL);
536         }
537
538         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
539                lmv->cluuid.uuid, obd->obd_name);
540
541         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
542                 tgt = lmv->tgts[i];
543                 if (tgt == NULL)
544                         continue;
545                 rc = lmv_connect_mdc(obd, tgt);
546                 if (rc)
547                         GOTO(out_disc, rc);
548         }
549
550         lmv->connected = 1;
551         easize = lmv_mds_md_size(lmv->desc.ld_tgt_count, LMV_MAGIC);
552         lmv_init_ea_size(obd->obd_self_export, easize, 0);
553         mutex_unlock(&lmv->lmv_init_mutex);
554         RETURN(0);
555
556  out_disc:
557         while (i-- > 0) {
558                 int rc2;
559                 tgt = lmv->tgts[i];
560                 if (tgt == NULL)
561                         continue;
562                 tgt->ltd_active = 0;
563                 if (tgt->ltd_exp) {
564                         --lmv->desc.ld_active_tgt_count;
565                         rc2 = obd_disconnect(tgt->ltd_exp);
566                         if (rc2) {
567                                 CERROR("LMV target %s disconnect on "
568                                        "MDC idx %d: error %d\n",
569                                        tgt->ltd_uuid.uuid, i, rc2);
570                         }
571                 }
572         }
573
574         mutex_unlock(&lmv->lmv_init_mutex);
575
576         RETURN(rc);
577 }
578
579 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
580 {
581         struct lmv_obd         *lmv = &obd->u.lmv;
582         struct obd_device      *mdc_obd;
583         int                     rc;
584         ENTRY;
585
586         LASSERT(tgt != NULL);
587         LASSERT(obd != NULL);
588
589         mdc_obd = class_exp2obd(tgt->ltd_exp);
590
591         if (mdc_obd) {
592                 mdc_obd->obd_force = obd->obd_force;
593                 mdc_obd->obd_fail = obd->obd_fail;
594                 mdc_obd->obd_no_recov = obd->obd_no_recov;
595
596                 if (lmv->targets_proc_entry != NULL)
597                         lprocfs_remove_proc_entry(mdc_obd->obd_name,
598                                                   lmv->targets_proc_entry);
599         }
600
601         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
602         if (rc)
603                 CERROR("Can't finanize fids factory\n");
604
605         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
606                tgt->ltd_exp->exp_obd->obd_name,
607                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
608
609         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
610         rc = obd_disconnect(tgt->ltd_exp);
611         if (rc) {
612                 if (tgt->ltd_active) {
613                         CERROR("Target %s disconnect error %d\n",
614                                tgt->ltd_uuid.uuid, rc);
615                 }
616         }
617
618         lmv_activate_target(lmv, tgt, 0);
619         tgt->ltd_exp = NULL;
620         RETURN(0);
621 }
622
623 static int lmv_disconnect(struct obd_export *exp)
624 {
625         struct obd_device       *obd = class_exp2obd(exp);
626         struct lmv_obd          *lmv = &obd->u.lmv;
627         int                      rc;
628         __u32                    i;
629         ENTRY;
630
631         if (!lmv->tgts)
632                 goto out_local;
633
634         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
635                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
636                         continue;
637
638                 lmv_disconnect_mdc(obd, lmv->tgts[i]);
639         }
640
641         if (lmv->targets_proc_entry != NULL)
642                 lprocfs_remove(&lmv->targets_proc_entry);
643         else
644                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
645                        obd->obd_type->typ_name, obd->obd_name);
646
647 out_local:
648         /*
649          * This is the case when no real connection is established by
650          * lmv_check_connect().
651          */
652         if (!lmv->connected)
653                 class_export_put(exp);
654         rc = class_disconnect(exp);
655         lmv->connected = 0;
656
657         RETURN(rc);
658 }
659
660 static int lmv_fid2path(struct obd_export *exp, int len, void *karg,
661                         void __user *uarg)
662 {
663         struct obd_device       *obddev = class_exp2obd(exp);
664         struct lmv_obd          *lmv = &obddev->u.lmv;
665         struct getinfo_fid2path *gf;
666         struct lmv_tgt_desc     *tgt;
667         struct getinfo_fid2path *remote_gf = NULL;
668         struct lu_fid           root_fid;
669         int                     remote_gf_size = 0;
670         int                     rc;
671
672         gf = karg;
673         tgt = lmv_find_target(lmv, &gf->gf_fid);
674         if (IS_ERR(tgt))
675                 RETURN(PTR_ERR(tgt));
676
677         root_fid = *gf->gf_u.gf_root_fid;
678         LASSERT(fid_is_sane(&root_fid));
679
680 repeat_fid2path:
681         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
682         if (rc != 0 && rc != -EREMOTE)
683                 GOTO(out_fid2path, rc);
684
685         /* If remote_gf != NULL, it means just building the
686          * path on the remote MDT, copy this path segement to gf */
687         if (remote_gf != NULL) {
688                 struct getinfo_fid2path *ori_gf;
689                 char *ptr;
690
691                 ori_gf = (struct getinfo_fid2path *)karg;
692                 if (strlen(ori_gf->gf_u.gf_path) +
693                     strlen(gf->gf_u.gf_path) > ori_gf->gf_pathlen)
694                         GOTO(out_fid2path, rc = -EOVERFLOW);
695
696                 ptr = ori_gf->gf_u.gf_path;
697
698                 memmove(ptr + strlen(gf->gf_u.gf_path) + 1, ptr,
699                         strlen(ori_gf->gf_u.gf_path));
700
701                 strncpy(ptr, gf->gf_u.gf_path,
702                         strlen(gf->gf_u.gf_path));
703                 ptr += strlen(gf->gf_u.gf_path);
704                 *ptr = '/';
705         }
706
707         CDEBUG(D_INFO, "%s: get path %s "DFID" rec: "LPU64" ln: %u\n",
708                tgt->ltd_exp->exp_obd->obd_name,
709                gf->gf_u.gf_path, PFID(&gf->gf_fid), gf->gf_recno,
710                gf->gf_linkno);
711
712         if (rc == 0)
713                 GOTO(out_fid2path, rc);
714
715         /* sigh, has to go to another MDT to do path building further */
716         if (remote_gf == NULL) {
717                 remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
718                 OBD_ALLOC(remote_gf, remote_gf_size);
719                 if (remote_gf == NULL)
720                         GOTO(out_fid2path, rc = -ENOMEM);
721                 remote_gf->gf_pathlen = PATH_MAX;
722         }
723
724         if (!fid_is_sane(&gf->gf_fid)) {
725                 CERROR("%s: invalid FID "DFID": rc = %d\n",
726                        tgt->ltd_exp->exp_obd->obd_name,
727                        PFID(&gf->gf_fid), -EINVAL);
728                 GOTO(out_fid2path, rc = -EINVAL);
729         }
730
731         tgt = lmv_find_target(lmv, &gf->gf_fid);
732         if (IS_ERR(tgt))
733                 GOTO(out_fid2path, rc = -EINVAL);
734
735         remote_gf->gf_fid = gf->gf_fid;
736         remote_gf->gf_recno = -1;
737         remote_gf->gf_linkno = -1;
738         memset(remote_gf->gf_u.gf_path, 0, remote_gf->gf_pathlen);
739         *remote_gf->gf_u.gf_root_fid = root_fid;
740         gf = remote_gf;
741         goto repeat_fid2path;
742
743 out_fid2path:
744         if (remote_gf != NULL)
745                 OBD_FREE(remote_gf, remote_gf_size);
746         RETURN(rc);
747 }
748
749 static int lmv_hsm_req_count(struct lmv_obd *lmv,
750                              const struct hsm_user_request *hur,
751                              const struct lmv_tgt_desc *tgt_mds)
752 {
753         __u32                    i;
754         int                      nr = 0;
755         struct lmv_tgt_desc     *curr_tgt;
756
757         /* count how many requests must be sent to the given target */
758         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
759                 curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
760                 if (IS_ERR(curr_tgt))
761                         RETURN(PTR_ERR(curr_tgt));
762                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
763                         nr++;
764         }
765         return nr;
766 }
767
768 static int lmv_hsm_req_build(struct lmv_obd *lmv,
769                               struct hsm_user_request *hur_in,
770                               const struct lmv_tgt_desc *tgt_mds,
771                               struct hsm_user_request *hur_out)
772 {
773         __u32                    i, nr_out;
774         struct lmv_tgt_desc     *curr_tgt;
775
776         /* build the hsm_user_request for the given target */
777         hur_out->hur_request = hur_in->hur_request;
778         nr_out = 0;
779         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
780                 curr_tgt = lmv_find_target(lmv,
781                                            &hur_in->hur_user_item[i].hui_fid);
782                 if (IS_ERR(curr_tgt))
783                         RETURN(PTR_ERR(curr_tgt));
784                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
785                         hur_out->hur_user_item[nr_out] =
786                                                 hur_in->hur_user_item[i];
787                         nr_out++;
788                 }
789         }
790         hur_out->hur_request.hr_itemcount = nr_out;
791         memcpy(hur_data(hur_out), hur_data(hur_in),
792                hur_in->hur_request.hr_data_len);
793
794         RETURN(0);
795 }
796
797 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
798                                  struct lustre_kernelcomm *lk,
799                                  void __user *uarg)
800 {
801         __u32   i;
802         int     rc;
803         ENTRY;
804
805         /* unregister request (call from llapi_hsm_copytool_fini) */
806         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
807                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
808
809                 if (tgt == NULL || tgt->ltd_exp == NULL)
810                         continue;
811                 /* best effort: try to clean as much as possible
812                  * (continue on error) */
813                 obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
814         }
815
816         /* Whatever the result, remove copytool from kuc groups.
817          * Unreached coordinators will get EPIPE on next requests
818          * and will unregister automatically.
819          */
820         rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
821
822         RETURN(rc);
823 }
824
825 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
826                                struct lustre_kernelcomm *lk, __user void *uarg)
827 {
828         struct file             *filp;
829         __u32                    i, j;
830         int                      err, rc;
831         bool                     any_set = false;
832         struct kkuc_ct_data      kcd = { 0 };
833         ENTRY;
834
835         /* All or nothing: try to register to all MDS.
836          * In case of failure, unregister from previous MDS,
837          * except if it because of inactive target. */
838         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
839                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
840
841                 if (tgt == NULL || tgt->ltd_exp == NULL)
842                         continue;
843                 err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
844                 if (err) {
845                         if (tgt->ltd_active) {
846                                 /* permanent error */
847                                 CERROR("%s: iocontrol MDC %s on MDT"
848                                        " idx %d cmd %x: err = %d\n",
849                                        lmv2obd_dev(lmv)->obd_name,
850                                        tgt->ltd_uuid.uuid, i, cmd, err);
851                                 rc = err;
852                                 lk->lk_flags |= LK_FLG_STOP;
853                                 /* unregister from previous MDS */
854                                 for (j = 0; j < i; j++) {
855                                         tgt = lmv->tgts[j];
856                                         if (tgt == NULL || tgt->ltd_exp == NULL)
857                                                 continue;
858                                         obd_iocontrol(cmd, tgt->ltd_exp, len,
859                                                       lk, uarg);
860                                 }
861                                 RETURN(rc);
862                         }
863                         /* else: transient error.
864                          * kuc will register to the missing MDT
865                          * when it is back */
866                 } else {
867                         any_set = true;
868                 }
869         }
870
871         if (!any_set)
872                 /* no registration done: return error */
873                 RETURN(-ENOTCONN);
874
875         /* at least one registration done, with no failure */
876         filp = fget(lk->lk_wfd);
877         if (filp == NULL)
878                 RETURN(-EBADF);
879
880         kcd.kcd_magic = KKUC_CT_DATA_MAGIC;
881         kcd.kcd_uuid = lmv->cluuid;
882         kcd.kcd_archive = lk->lk_data;
883
884         rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group,
885                                    &kcd, sizeof(kcd));
886         if (rc != 0)
887                 fput(filp);
888
889         RETURN(rc);
890 }
891
892
893
894
895 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
896                          int len, void *karg, void __user *uarg)
897 {
898         struct obd_device       *obddev = class_exp2obd(exp);
899         struct lmv_obd          *lmv = &obddev->u.lmv;
900         struct lmv_tgt_desc     *tgt = NULL;
901         __u32                    i = 0;
902         int                      rc = 0;
903         int                      set = 0;
904         __u32                    count = lmv->desc.ld_tgt_count;
905         ENTRY;
906
907         if (count == 0)
908                 RETURN(-ENOTTY);
909
910         switch (cmd) {
911         case IOC_OBD_STATFS: {
912                 struct obd_ioctl_data *data = karg;
913                 struct obd_device *mdc_obd;
914                 struct obd_statfs stat_buf = {0};
915                 __u32 index;
916
917                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
918                 if ((index >= count))
919                         RETURN(-ENODEV);
920
921                 tgt = lmv->tgts[index];
922                 if (tgt == NULL || !tgt->ltd_active)
923                         RETURN(-ENODATA);
924
925                 mdc_obd = class_exp2obd(tgt->ltd_exp);
926                 if (!mdc_obd)
927                         RETURN(-EINVAL);
928
929                 /* copy UUID */
930                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
931                                  min((int) data->ioc_plen2,
932                                      (int) sizeof(struct obd_uuid))))
933                         RETURN(-EFAULT);
934
935                 rc = obd_statfs(NULL, tgt->ltd_exp, &stat_buf,
936                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
937                                 0);
938                 if (rc)
939                         RETURN(rc);
940                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
941                                  min((int) data->ioc_plen1,
942                                      (int) sizeof(stat_buf))))
943                         RETURN(-EFAULT);
944                 break;
945         }
946         case OBD_IOC_QUOTACTL: {
947                 struct if_quotactl *qctl = karg;
948                 struct obd_quotactl *oqctl;
949
950                 if (qctl->qc_valid == QC_MDTIDX) {
951                         if (count <= qctl->qc_idx)
952                                 RETURN(-EINVAL);
953
954                         tgt = lmv->tgts[qctl->qc_idx];
955                         if (tgt == NULL || tgt->ltd_exp == NULL)
956                                 RETURN(-EINVAL);
957                 } else if (qctl->qc_valid == QC_UUID) {
958                         for (i = 0; i < count; i++) {
959                                 tgt = lmv->tgts[i];
960                                 if (tgt == NULL)
961                                         continue;
962                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
963                                                      &qctl->obd_uuid))
964                                         continue;
965
966                                 if (tgt->ltd_exp == NULL)
967                                         RETURN(-EINVAL);
968
969                                 break;
970                         }
971                 } else {
972                         RETURN(-EINVAL);
973                 }
974
975                 if (i >= count)
976                         RETURN(-EAGAIN);
977
978                 LASSERT(tgt != NULL && tgt->ltd_exp != NULL);
979                 OBD_ALLOC_PTR(oqctl);
980                 if (!oqctl)
981                         RETURN(-ENOMEM);
982
983                 QCTL_COPY(oqctl, qctl);
984                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
985                 if (rc == 0) {
986                         QCTL_COPY(qctl, oqctl);
987                         qctl->qc_valid = QC_MDTIDX;
988                         qctl->obd_uuid = tgt->ltd_uuid;
989                 }
990                 OBD_FREE_PTR(oqctl);
991                 break;
992         }
993         case OBD_IOC_CHANGELOG_SEND:
994         case OBD_IOC_CHANGELOG_CLEAR: {
995                 struct ioc_changelog *icc = karg;
996
997                 if (icc->icc_mdtindex >= count)
998                         RETURN(-ENODEV);
999
1000                 tgt = lmv->tgts[icc->icc_mdtindex];
1001                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
1002                         RETURN(-ENODEV);
1003                 rc = obd_iocontrol(cmd, tgt->ltd_exp, sizeof(*icc), icc, NULL);
1004                 break;
1005         }
1006         case LL_IOC_GET_CONNECT_FLAGS: {
1007                 tgt = lmv->tgts[0];
1008                 if (tgt == NULL || tgt->ltd_exp == NULL)
1009                         RETURN(-ENODATA);
1010                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1011                 break;
1012         }
1013         case LL_IOC_FID2MDTIDX: {
1014                 struct lu_fid *fid = karg;
1015                 int             mdt_index;
1016
1017                 rc = lmv_fld_lookup(lmv, fid, &mdt_index);
1018                 if (rc != 0)
1019                         RETURN(rc);
1020
1021                 /* Note: this is from llite(see ll_dir_ioctl()), @uarg does not
1022                  * point to user space memory for FID2MDTIDX. */
1023                 *(__u32 *)uarg = mdt_index;
1024                 break;
1025         }
1026         case OBD_IOC_FID2PATH: {
1027                 rc = lmv_fid2path(exp, len, karg, uarg);
1028                 break;
1029         }
1030         case LL_IOC_HSM_STATE_GET:
1031         case LL_IOC_HSM_STATE_SET:
1032         case LL_IOC_HSM_ACTION: {
1033                 struct md_op_data       *op_data = karg;
1034
1035                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1036                 if (IS_ERR(tgt))
1037                         RETURN(PTR_ERR(tgt));
1038
1039                 if (tgt->ltd_exp == NULL)
1040                         RETURN(-EINVAL);
1041
1042                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1043                 break;
1044         }
1045         case LL_IOC_HSM_PROGRESS: {
1046                 const struct hsm_progress_kernel *hpk = karg;
1047
1048                 tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1049                 if (IS_ERR(tgt))
1050                         RETURN(PTR_ERR(tgt));
1051                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1052                 break;
1053         }
1054         case LL_IOC_HSM_REQUEST: {
1055                 struct hsm_user_request *hur = karg;
1056                 unsigned int reqcount = hur->hur_request.hr_itemcount;
1057
1058                 if (reqcount == 0)
1059                         RETURN(0);
1060
1061                 /* if the request is about a single fid
1062                  * or if there is a single MDS, no need to split
1063                  * the request. */
1064                 if (reqcount == 1 || count == 1) {
1065                         tgt = lmv_find_target(lmv,
1066                                               &hur->hur_user_item[0].hui_fid);
1067                         if (IS_ERR(tgt))
1068                                 RETURN(PTR_ERR(tgt));
1069                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1070                 } else {
1071                         /* split fid list to their respective MDS */
1072                         for (i = 0; i < count; i++) {
1073                                 int nr, rc1;
1074                                 size_t reqlen;
1075                                 struct hsm_user_request *req;
1076
1077                                 tgt = lmv->tgts[i];
1078                                 if (tgt == NULL || tgt->ltd_exp == NULL)
1079                                         continue;
1080
1081                                 nr = lmv_hsm_req_count(lmv, hur, tgt);
1082                                 if (nr < 0)
1083                                         RETURN(nr);
1084                                 if (nr == 0) /* nothing for this MDS */
1085                                         continue;
1086
1087                                 /* build a request with fids for this MDS */
1088                                 reqlen = offsetof(typeof(*hur),
1089                                                   hur_user_item[nr])
1090                                                 + hur->hur_request.hr_data_len;
1091                                 OBD_ALLOC_LARGE(req, reqlen);
1092                                 if (req == NULL)
1093                                         RETURN(-ENOMEM);
1094                                 rc1 = lmv_hsm_req_build(lmv, hur, tgt, req);
1095                                 if (rc1 < 0)
1096                                         GOTO(hsm_req_err, rc1);
1097                                 rc1 = obd_iocontrol(cmd, tgt->ltd_exp, reqlen,
1098                                                     req, uarg);
1099 hsm_req_err:
1100                                 if (rc1 != 0 && rc == 0)
1101                                         rc = rc1;
1102                                 OBD_FREE_LARGE(req, reqlen);
1103                         }
1104                 }
1105                 break;
1106         }
1107         case LL_IOC_LOV_SWAP_LAYOUTS: {
1108                 struct md_op_data       *op_data = karg;
1109                 struct lmv_tgt_desc     *tgt1, *tgt2;
1110
1111                 tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1112                 if (IS_ERR(tgt1))
1113                         RETURN(PTR_ERR(tgt1));
1114
1115                 tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1116                 if (IS_ERR(tgt2))
1117                         RETURN(PTR_ERR(tgt2));
1118
1119                 if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1120                         RETURN(-EINVAL);
1121
1122                 /* only files on same MDT can have their layouts swapped */
1123                 if (tgt1->ltd_idx != tgt2->ltd_idx)
1124                         RETURN(-EPERM);
1125
1126                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1127                 break;
1128         }
1129         case LL_IOC_HSM_CT_START: {
1130                 struct lustre_kernelcomm *lk = karg;
1131                 if (lk->lk_flags & LK_FLG_STOP)
1132                         rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1133                 else
1134                         rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1135                 break;
1136         }
1137         default:
1138                 for (i = 0; i < count; i++) {
1139                         struct obd_device *mdc_obd;
1140                         int err;
1141
1142                         tgt = lmv->tgts[i];
1143                         if (tgt == NULL || tgt->ltd_exp == NULL)
1144                                 continue;
1145                         /* ll_umount_begin() sets force flag but for lmv, not
1146                          * mdc. Let's pass it through */
1147                         mdc_obd = class_exp2obd(tgt->ltd_exp);
1148                         mdc_obd->obd_force = obddev->obd_force;
1149                         err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1150                         if (err) {
1151                                 if (tgt->ltd_active) {
1152                                         CERROR("error: iocontrol MDC %s on MDT"
1153                                                " idx %d cmd %x: err = %d\n",
1154                                                tgt->ltd_uuid.uuid, i, cmd, err);
1155                                         if (!rc)
1156                                                 rc = err;
1157                                 }
1158                         } else
1159                                 set = 1;
1160                 }
1161                 if (!set && !rc)
1162                         rc = -EIO;
1163         }
1164         RETURN(rc);
1165 }
1166
1167 /**
1168  * This is _inode_ placement policy function (not name).
1169  */
1170 static int lmv_placement_policy(struct obd_device *obd,
1171                                 struct md_op_data *op_data, u32 *mds)
1172 {
1173         struct lmv_obd          *lmv = &obd->u.lmv;
1174         ENTRY;
1175
1176         LASSERT(mds != NULL);
1177
1178         if (lmv->desc.ld_tgt_count == 1) {
1179                 *mds = 0;
1180                 RETURN(0);
1181         }
1182
1183         if (op_data->op_default_stripe_offset != -1) {
1184                 *mds = op_data->op_default_stripe_offset;
1185                 RETURN(0);
1186         }
1187
1188         /**
1189          * If stripe_offset is provided during setdirstripe
1190          * (setdirstripe -i xx), xx MDS will be choosen.
1191          */
1192         if (op_data->op_cli_flags & CLI_SET_MEA && op_data->op_data != NULL) {
1193                 struct lmv_user_md *lum;
1194
1195                 lum = op_data->op_data;
1196
1197                 if (le32_to_cpu(lum->lum_stripe_offset) != (__u32)-1) {
1198                         *mds = le32_to_cpu(lum->lum_stripe_offset);
1199                 } else {
1200                         /* -1 means default, which will be in the same MDT with
1201                          * the stripe */
1202                         *mds = op_data->op_mds;
1203                         lum->lum_stripe_offset = cpu_to_le32(op_data->op_mds);
1204                 }
1205         } else {
1206                 /* Allocate new fid on target according to operation type and
1207                  * parent home mds. */
1208                 *mds = op_data->op_mds;
1209         }
1210
1211         RETURN(0);
1212 }
1213
1214 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds)
1215 {
1216         struct lmv_tgt_desc     *tgt;
1217         int                      rc;
1218         ENTRY;
1219
1220         tgt = lmv_get_target(lmv, mds, NULL);
1221         if (IS_ERR(tgt))
1222                 RETURN(PTR_ERR(tgt));
1223
1224         /*
1225          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1226          * on server that seq in new allocated fid is not yet known.
1227          */
1228         mutex_lock(&tgt->ltd_fid_mutex);
1229
1230         if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
1231                 GOTO(out, rc = -ENODEV);
1232
1233         /*
1234          * Asking underlying tgt layer to allocate new fid.
1235          */
1236         rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL);
1237         if (rc > 0) {
1238                 LASSERT(fid_is_sane(fid));
1239                 rc = 0;
1240         }
1241
1242         EXIT;
1243 out:
1244         mutex_unlock(&tgt->ltd_fid_mutex);
1245         return rc;
1246 }
1247
1248 int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp,
1249                   struct lu_fid *fid, struct md_op_data *op_data)
1250 {
1251         struct obd_device     *obd = class_exp2obd(exp);
1252         struct lmv_obd        *lmv = &obd->u.lmv;
1253         u32                    mds = 0;
1254         int                    rc;
1255         ENTRY;
1256
1257         LASSERT(op_data != NULL);
1258         LASSERT(fid != NULL);
1259
1260         rc = lmv_placement_policy(obd, op_data, &mds);
1261         if (rc) {
1262                 CERROR("Can't get target for allocating fid, "
1263                        "rc %d\n", rc);
1264                 RETURN(rc);
1265         }
1266
1267         rc = __lmv_fid_alloc(lmv, fid, mds);
1268         if (rc) {
1269                 CERROR("Can't alloc new fid, rc %d\n", rc);
1270                 RETURN(rc);
1271         }
1272
1273         RETURN(rc);
1274 }
1275
1276 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1277 {
1278         struct lmv_obd  *lmv = &obd->u.lmv;
1279         struct lmv_desc *desc;
1280         int             rc;
1281         ENTRY;
1282
1283         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1284                 CERROR("LMV setup requires a descriptor\n");
1285                 RETURN(-EINVAL);
1286         }
1287
1288         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1289         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1290                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1291                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1292                 RETURN(-EINVAL);
1293         }
1294
1295         lmv->tgts_size = 32U;
1296         OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1297         if (lmv->tgts == NULL)
1298                 RETURN(-ENOMEM);
1299
1300         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1301         lmv->desc.ld_tgt_count = 0;
1302         lmv->desc.ld_active_tgt_count = 0;
1303         lmv->max_def_easize = 0;
1304         lmv->max_easize = 0;
1305
1306         spin_lock_init(&lmv->lmv_lock);
1307         mutex_init(&lmv->lmv_init_mutex);
1308
1309 #ifdef CONFIG_PROC_FS
1310         obd->obd_vars = lprocfs_lmv_obd_vars;
1311         lprocfs_obd_setup(obd);
1312         lprocfs_alloc_md_stats(obd, 0);
1313         rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1314                                 0444, &lmv_proc_target_fops, obd);
1315         if (rc)
1316                 CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1317                       obd->obd_name, rc);
1318 #endif
1319         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1320                              LUSTRE_CLI_FLD_HASH_DHT);
1321         if (rc) {
1322                 CERROR("Can't init FLD, err %d\n", rc);
1323                 GOTO(out, rc);
1324         }
1325
1326         RETURN(0);
1327
1328 out:
1329         return rc;
1330 }
1331
1332 static int lmv_cleanup(struct obd_device *obd)
1333 {
1334         struct lmv_obd   *lmv = &obd->u.lmv;
1335         ENTRY;
1336
1337         fld_client_fini(&lmv->lmv_fld);
1338         if (lmv->tgts != NULL) {
1339                 int i;
1340                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1341                         if (lmv->tgts[i] == NULL)
1342                                 continue;
1343                         lmv_del_target(lmv, i);
1344                 }
1345                 OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1346                 lmv->tgts_size = 0;
1347         }
1348         RETURN(0);
1349 }
1350
1351 static int lmv_process_config(struct obd_device *obd, size_t len, void *buf)
1352 {
1353         struct lustre_cfg       *lcfg = buf;
1354         struct obd_uuid         obd_uuid;
1355         int                     gen;
1356         __u32                   index;
1357         int                     rc;
1358         ENTRY;
1359
1360         switch (lcfg->lcfg_command) {
1361         case LCFG_ADD_MDC:
1362                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1363                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1364                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
1365                         GOTO(out, rc = -EINVAL);
1366
1367                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1368
1369                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%u", &index) != 1)
1370                         GOTO(out, rc = -EINVAL);
1371                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
1372                         GOTO(out, rc = -EINVAL);
1373                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1374                 GOTO(out, rc);
1375         default:
1376                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1377                 GOTO(out, rc = -EINVAL);
1378         }
1379 out:
1380         RETURN(rc);
1381 }
1382
1383 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1384                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1385 {
1386         struct obd_device       *obd = class_exp2obd(exp);
1387         struct lmv_obd          *lmv = &obd->u.lmv;
1388         struct obd_statfs       *temp;
1389         int                      rc = 0;
1390         __u32                    i;
1391         ENTRY;
1392
1393         OBD_ALLOC(temp, sizeof(*temp));
1394         if (temp == NULL)
1395                 RETURN(-ENOMEM);
1396
1397         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1398                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1399                         continue;
1400
1401                 rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1402                                 max_age, flags);
1403                 if (rc) {
1404                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1405                                lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1406                                rc);
1407                         GOTO(out_free_temp, rc);
1408                 }
1409
1410                 if (i == 0) {
1411                         *osfs = *temp;
1412                         /* If the statfs is from mount, it will needs
1413                          * retrieve necessary information from MDT0.
1414                          * i.e. mount does not need the merged osfs
1415                          * from all of MDT.
1416                          * And also clients can be mounted as long as
1417                          * MDT0 is in service*/
1418                         if (flags & OBD_STATFS_FOR_MDT0)
1419                                 GOTO(out_free_temp, rc);
1420                 } else {
1421                         osfs->os_bavail += temp->os_bavail;
1422                         osfs->os_blocks += temp->os_blocks;
1423                         osfs->os_ffree += temp->os_ffree;
1424                         osfs->os_files += temp->os_files;
1425                 }
1426         }
1427
1428         EXIT;
1429 out_free_temp:
1430         OBD_FREE(temp, sizeof(*temp));
1431         return rc;
1432 }
1433
1434 static int lmv_get_root(struct obd_export *exp, const char *fileset,
1435                         struct lu_fid *fid)
1436 {
1437         struct obd_device    *obd = exp->exp_obd;
1438         struct lmv_obd       *lmv = &obd->u.lmv;
1439         int                   rc;
1440         ENTRY;
1441
1442         rc = md_get_root(lmv->tgts[0]->ltd_exp, fileset, fid);
1443         RETURN(rc);
1444 }
1445
1446 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1447                         u64 valid, const char *name,
1448                         const char *input, int input_size, int output_size,
1449                         int flags, struct ptlrpc_request **request)
1450 {
1451         struct obd_device      *obd = exp->exp_obd;
1452         struct lmv_obd         *lmv = &obd->u.lmv;
1453         struct lmv_tgt_desc    *tgt;
1454         int                     rc;
1455         ENTRY;
1456
1457         tgt = lmv_find_target(lmv, fid);
1458         if (IS_ERR(tgt))
1459                 RETURN(PTR_ERR(tgt));
1460
1461         rc = md_getxattr(tgt->ltd_exp, fid, valid, name, input,
1462                          input_size, output_size, flags, request);
1463
1464         RETURN(rc);
1465 }
1466
1467 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1468                         u64 valid, const char *name,
1469                         const char *input, int input_size, int output_size,
1470                         int flags, __u32 suppgid,
1471                         struct ptlrpc_request **request)
1472 {
1473         struct obd_device      *obd = exp->exp_obd;
1474         struct lmv_obd         *lmv = &obd->u.lmv;
1475         struct lmv_tgt_desc    *tgt;
1476         int                     rc;
1477         ENTRY;
1478
1479         tgt = lmv_find_target(lmv, fid);
1480         if (IS_ERR(tgt))
1481                 RETURN(PTR_ERR(tgt));
1482
1483         rc = md_setxattr(tgt->ltd_exp, fid, valid, name, input,
1484                          input_size, output_size, flags, suppgid,
1485                          request);
1486
1487         RETURN(rc);
1488 }
1489
1490 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1491                        struct ptlrpc_request **request)
1492 {
1493         struct obd_device       *obd = exp->exp_obd;
1494         struct lmv_obd          *lmv = &obd->u.lmv;
1495         struct lmv_tgt_desc     *tgt;
1496         int                      rc;
1497         ENTRY;
1498
1499         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1500         if (IS_ERR(tgt))
1501                 RETURN(PTR_ERR(tgt));
1502
1503         if (op_data->op_flags & MF_GET_MDT_IDX) {
1504                 op_data->op_mds = tgt->ltd_idx;
1505                 RETURN(0);
1506         }
1507
1508         rc = md_getattr(tgt->ltd_exp, op_data, request);
1509
1510         RETURN(rc);
1511 }
1512
1513 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1514 {
1515         struct obd_device   *obd = exp->exp_obd;
1516         struct lmv_obd      *lmv = &obd->u.lmv;
1517         __u32                i;
1518         ENTRY;
1519
1520         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1521
1522         /*
1523          * With DNE every object can have two locks in different namespaces:
1524          * lookup lock in space of MDT storing direntry and update/open lock in
1525          * space of MDT storing inode.
1526          */
1527         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1528                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1529                         continue;
1530                 md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1531         }
1532
1533         RETURN(0);
1534 }
1535
1536 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1537                      struct md_open_data *mod, struct ptlrpc_request **request)
1538 {
1539         struct obd_device     *obd = exp->exp_obd;
1540         struct lmv_obd        *lmv = &obd->u.lmv;
1541         struct lmv_tgt_desc   *tgt;
1542         int                    rc;
1543         ENTRY;
1544
1545         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1546         if (IS_ERR(tgt))
1547                 RETURN(PTR_ERR(tgt));
1548
1549         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1550         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1551         RETURN(rc);
1552 }
1553
1554 /**
1555  * Choosing the MDT by name or FID in @op_data.
1556  * For non-striped directory, it will locate MDT by fid.
1557  * For striped-directory, it will locate MDT by name. And also
1558  * it will reset op_fid1 with the FID of the choosen stripe.
1559  **/
1560 struct lmv_tgt_desc *
1561 lmv_locate_target_for_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm,
1562                            const char *name, int namelen, struct lu_fid *fid,
1563                            u32 *mds)
1564 {
1565         struct lmv_tgt_desc     *tgt;
1566         const struct lmv_oinfo  *oinfo;
1567
1568         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NAME_HASH)) {
1569                 if (cfs_fail_val >= lsm->lsm_md_stripe_count)
1570                         RETURN(ERR_PTR(-EBADF));
1571                 oinfo = &lsm->lsm_md_oinfo[cfs_fail_val];
1572         } else {
1573                 oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
1574                 if (IS_ERR(oinfo))
1575                         RETURN(ERR_CAST(oinfo));
1576         }
1577
1578         if (fid != NULL)
1579                 *fid = oinfo->lmo_fid;
1580         if (mds != NULL)
1581                 *mds = oinfo->lmo_mds;
1582
1583         tgt = lmv_get_target(lmv, oinfo->lmo_mds, NULL);
1584
1585         CDEBUG(D_INFO, "locate on mds %u "DFID"\n", oinfo->lmo_mds,
1586                PFID(&oinfo->lmo_fid));
1587         return tgt;
1588 }
1589
1590 /**
1591  * Locate mds by fid or name
1592  *
1593  * For striped directory (lsm != NULL), it will locate the stripe
1594  * by name hash (see lsm_name_to_stripe_info()). Note: if the hash_type
1595  * is unknown, it will return -EBADFD, and lmv_intent_lookup might need
1596  * walk through all of stripes to locate the entry.
1597  *
1598  * For normal direcotry, it will locate MDS by FID directly.
1599  * \param[in] lmv       LMV device
1600  * \param[in] op_data   client MD stack parameters, name, namelen
1601  *                      mds_num etc.
1602  * \param[in] fid       object FID used to locate MDS.
1603  *
1604  * retval               pointer to the lmv_tgt_desc if succeed.
1605  *                      ERR_PTR(errno) if failed.
1606  */
1607 struct lmv_tgt_desc*
1608 lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1609                struct lu_fid *fid)
1610 {
1611         struct lmv_stripe_md    *lsm = op_data->op_mea1;
1612         struct lmv_tgt_desc     *tgt;
1613
1614         /* During creating VOLATILE file, it should honor the mdt
1615          * index if the file under striped dir is being restored, see
1616          * ct_restore(). */
1617         if (op_data->op_bias & MDS_CREATE_VOLATILE &&
1618             (int)op_data->op_mds != -1) {
1619                 int i;
1620                 tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
1621                 if (IS_ERR(tgt))
1622                         return tgt;
1623
1624                 if (lsm != NULL) {
1625                         /* refill the right parent fid */
1626                         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
1627                                 struct lmv_oinfo *oinfo;
1628
1629                                 oinfo = &lsm->lsm_md_oinfo[i];
1630                                 if (oinfo->lmo_mds == op_data->op_mds) {
1631                                         *fid = oinfo->lmo_fid;
1632                                         break;
1633                                 }
1634                         }
1635
1636                         if (i == lsm->lsm_md_stripe_count)
1637                                 *fid = lsm->lsm_md_oinfo[0].lmo_fid;
1638                 }
1639
1640                 return tgt;
1641         }
1642
1643         if (lsm == NULL || op_data->op_namelen == 0) {
1644                 tgt = lmv_find_target(lmv, fid);
1645                 if (IS_ERR(tgt))
1646                         return tgt;
1647
1648                 op_data->op_mds = tgt->ltd_idx;
1649                 return tgt;
1650         }
1651
1652         return lmv_locate_target_for_name(lmv, lsm, op_data->op_name,
1653                                           op_data->op_namelen, fid,
1654                                           &op_data->op_mds);
1655 }
1656
1657 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1658                 const void *data, size_t datalen, umode_t mode, uid_t uid,
1659                 gid_t gid, cfs_cap_t cap_effective, __u64 rdev,
1660                 struct ptlrpc_request **request)
1661 {
1662         struct obd_device       *obd = exp->exp_obd;
1663         struct lmv_obd          *lmv = &obd->u.lmv;
1664         struct lmv_tgt_desc     *tgt;
1665         int                      rc;
1666         ENTRY;
1667
1668         if (!lmv->desc.ld_active_tgt_count)
1669                 RETURN(-EIO);
1670
1671         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1672         if (IS_ERR(tgt))
1673                 RETURN(PTR_ERR(tgt));
1674
1675         CDEBUG(D_INODE, "CREATE name '%.*s' on "DFID" -> mds #%x\n",
1676                 (int)op_data->op_namelen, op_data->op_name,
1677                 PFID(&op_data->op_fid1), op_data->op_mds);
1678
1679         rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1680         if (rc)
1681                 RETURN(rc);
1682         if (exp_connect_flags(exp) & OBD_CONNECT_DIR_STRIPE) {
1683                 /* Send the create request to the MDT where the object
1684                  * will be located */
1685                 tgt = lmv_find_target(lmv, &op_data->op_fid2);
1686                 if (IS_ERR(tgt))
1687                         RETURN(PTR_ERR(tgt));
1688
1689                 op_data->op_mds = tgt->ltd_idx;
1690         } else {
1691                 CDEBUG(D_CONFIG, "Server doesn't support striped dirs\n");
1692         }
1693
1694         CDEBUG(D_INODE, "CREATE obj "DFID" -> mds #%x\n",
1695                PFID(&op_data->op_fid2), op_data->op_mds);
1696
1697         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1698         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1699                        cap_effective, rdev, request);
1700         if (rc == 0) {
1701                 if (*request == NULL)
1702                         RETURN(rc);
1703                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1704         }
1705         RETURN(rc);
1706 }
1707
1708 static int
1709 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1710             const union ldlm_policy_data *policy,
1711             struct lookup_intent *it, struct md_op_data *op_data,
1712             struct lustre_handle *lockh, __u64 extra_lock_flags)
1713 {
1714         struct obd_device        *obd = exp->exp_obd;
1715         struct lmv_obd           *lmv = &obd->u.lmv;
1716         struct lmv_tgt_desc      *tgt;
1717         int                       rc;
1718         ENTRY;
1719
1720         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1721                LL_IT2STR(it), PFID(&op_data->op_fid1));
1722
1723         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1724         if (IS_ERR(tgt))
1725                 RETURN(PTR_ERR(tgt));
1726
1727         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%u\n",
1728                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1729
1730         rc = md_enqueue(tgt->ltd_exp, einfo, policy, it, op_data, lockh,
1731                         extra_lock_flags);
1732
1733         RETURN(rc);
1734 }
1735
1736 static int
1737 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
1738                  struct ptlrpc_request **preq)
1739 {
1740         struct ptlrpc_request   *req = NULL;
1741         struct obd_device       *obd = exp->exp_obd;
1742         struct lmv_obd          *lmv = &obd->u.lmv;
1743         struct lmv_tgt_desc     *tgt;
1744         struct mdt_body         *body;
1745         int                      rc;
1746         ENTRY;
1747
1748         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1749         if (IS_ERR(tgt))
1750                 RETURN(PTR_ERR(tgt));
1751
1752         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1753                 (int)op_data->op_namelen, op_data->op_name,
1754                 PFID(&op_data->op_fid1), tgt->ltd_idx);
1755
1756         rc = md_getattr_name(tgt->ltd_exp, op_data, preq);
1757         if (rc != 0)
1758                 RETURN(rc);
1759
1760         body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY);
1761         LASSERT(body != NULL);
1762
1763         if (body->mbo_valid & OBD_MD_MDS) {
1764                 struct lu_fid rid = body->mbo_fid1;
1765                 CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1766                        PFID(&rid));
1767
1768                 tgt = lmv_find_target(lmv, &rid);
1769                 if (IS_ERR(tgt)) {
1770                         ptlrpc_req_finished(*preq);
1771                         preq = NULL;
1772                         RETURN(PTR_ERR(tgt));
1773                 }
1774
1775                 op_data->op_fid1 = rid;
1776                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1777                 op_data->op_namelen = 0;
1778                 op_data->op_name = NULL;
1779                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1780                 ptlrpc_req_finished(*preq);
1781                 *preq = req;
1782         }
1783
1784         RETURN(rc);
1785 }
1786
1787 #define md_op_data_fid(op_data, fl)                     \
1788         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1789          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1790          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1791          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1792          NULL)
1793
1794 static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt,
1795                             struct md_op_data *op_data, __u32 op_tgt,
1796                             enum ldlm_mode mode, int bits, int flag)
1797 {
1798         struct lu_fid *fid = md_op_data_fid(op_data, flag);
1799         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
1800         union ldlm_policy_data policy = { { 0 } };
1801         int rc = 0;
1802         ENTRY;
1803
1804         if (!fid_is_sane(fid))
1805                 RETURN(0);
1806
1807         if (tgt == NULL) {
1808                 tgt = lmv_find_target(lmv, fid);
1809                 if (IS_ERR(tgt))
1810                         RETURN(PTR_ERR(tgt));
1811         }
1812
1813         if (tgt->ltd_idx != op_tgt) {
1814                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1815                 policy.l_inodebits.bits = bits;
1816                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1817                                       mode, LCF_ASYNC, NULL);
1818         } else {
1819                 CDEBUG(D_INODE,
1820                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
1821                        op_tgt, PFID(fid));
1822                 op_data->op_flags |= flag;
1823                 rc = 0;
1824         }
1825
1826         RETURN(rc);
1827 }
1828
1829 /*
1830  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1831  * op_data->op_fid2
1832  */
1833 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1834                     struct ptlrpc_request **request)
1835 {
1836         struct obd_device       *obd = exp->exp_obd;
1837         struct lmv_obd          *lmv = &obd->u.lmv;
1838         struct lmv_tgt_desc     *tgt;
1839         int                      rc;
1840         ENTRY;
1841
1842         LASSERT(op_data->op_namelen != 0);
1843
1844         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1845                PFID(&op_data->op_fid2), (int)op_data->op_namelen,
1846                op_data->op_name, PFID(&op_data->op_fid1));
1847
1848         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1849         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1850         op_data->op_cap = cfs_curproc_cap_pack();
1851         if (op_data->op_mea2 != NULL) {
1852                 struct lmv_stripe_md    *lsm = op_data->op_mea2;
1853                 const struct lmv_oinfo  *oinfo;
1854
1855                 oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
1856                                                 op_data->op_namelen);
1857                 if (IS_ERR(oinfo))
1858                         RETURN(PTR_ERR(oinfo));
1859
1860                 op_data->op_fid2 = oinfo->lmo_fid;
1861         }
1862
1863         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1864         if (IS_ERR(tgt))
1865                 RETURN(PTR_ERR(tgt));
1866
1867         /*
1868          * Cancel UPDATE lock on child (fid1).
1869          */
1870         op_data->op_flags |= MF_MDC_CANCEL_FID2;
1871         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
1872                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1873         if (rc != 0)
1874                 RETURN(rc);
1875
1876         rc = md_link(tgt->ltd_exp, op_data, request);
1877
1878         RETURN(rc);
1879 }
1880
1881 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1882                       const char *old, size_t oldlen,
1883                       const char *new, size_t newlen,
1884                       struct ptlrpc_request **request)
1885 {
1886         struct obd_device       *obd = exp->exp_obd;
1887         struct lmv_obd          *lmv = &obd->u.lmv;
1888         struct lmv_tgt_desc     *src_tgt;
1889         struct lmv_tgt_desc     *tgt_tgt;
1890         struct obd_export       *target_exp;
1891         struct mdt_body         *body;
1892         int                     rc;
1893         ENTRY;
1894
1895         LASSERT(oldlen != 0);
1896
1897         CDEBUG(D_INODE, "RENAME %.*s in "DFID":%d to %.*s in "DFID":%d\n",
1898                (int)oldlen, old, PFID(&op_data->op_fid1),
1899                op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
1900                (int)newlen, new, PFID(&op_data->op_fid2),
1901                op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
1902
1903         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1904         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1905         op_data->op_cap = cfs_curproc_cap_pack();
1906         if (op_data->op_cli_flags & CLI_MIGRATE) {
1907                 LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
1908                          PFID(&op_data->op_fid3));
1909
1910                 if (op_data->op_mea1 != NULL) {
1911                         struct lmv_stripe_md    *lsm = op_data->op_mea1;
1912                         struct lmv_tgt_desc     *tmp;
1913
1914                         /* Fix the parent fid for striped dir */
1915                         tmp = lmv_locate_target_for_name(lmv, lsm, old,
1916                                                          oldlen,
1917                                                          &op_data->op_fid1,
1918                                                          NULL);
1919                         if (IS_ERR(tmp))
1920                                 RETURN(PTR_ERR(tmp));
1921                 }
1922
1923                 rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1924                 if (rc != 0)
1925                         RETURN(rc);
1926
1927                 src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
1928                 if (IS_ERR(src_tgt))
1929                         RETURN(PTR_ERR(src_tgt));
1930
1931                 target_exp = src_tgt->ltd_exp;
1932         } else {
1933                 if (op_data->op_mea1 != NULL) {
1934                         struct lmv_stripe_md    *lsm = op_data->op_mea1;
1935
1936                         src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
1937                                                              oldlen,
1938                                                              &op_data->op_fid1,
1939                                                              &op_data->op_mds);
1940                 } else {
1941                         src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
1942                 }
1943                 if (IS_ERR(src_tgt))
1944                         RETURN(PTR_ERR(src_tgt));
1945
1946
1947                 if (op_data->op_mea2 != NULL) {
1948                         struct lmv_stripe_md    *lsm = op_data->op_mea2;
1949
1950                         tgt_tgt = lmv_locate_target_for_name(lmv, lsm, new,
1951                                                              newlen,
1952                                                              &op_data->op_fid2,
1953                                                              &op_data->op_mds);
1954                 } else {
1955                         tgt_tgt = lmv_find_target(lmv, &op_data->op_fid2);
1956
1957                 }
1958                 if (IS_ERR(tgt_tgt))
1959                         RETURN(PTR_ERR(tgt_tgt));
1960
1961                 target_exp = tgt_tgt->ltd_exp;
1962         }
1963
1964         /*
1965          * LOOKUP lock on src child (fid3) should also be cancelled for
1966          * src_tgt in mdc_rename.
1967          */
1968         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1969
1970         /*
1971          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1972          * own target.
1973          */
1974         rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1975                               LCK_EX, MDS_INODELOCK_UPDATE,
1976                               MF_MDC_CANCEL_FID2);
1977
1978         if (rc != 0)
1979                 RETURN(rc);
1980         /*
1981          * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
1982          */
1983         if (fid_is_sane(&op_data->op_fid3)) {
1984                 struct lmv_tgt_desc *tgt;
1985
1986                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1987                 if (IS_ERR(tgt))
1988                         RETURN(PTR_ERR(tgt));
1989
1990                 /* Cancel LOOKUP lock on its parent */
1991                 rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
1992                                       LCK_EX, MDS_INODELOCK_LOOKUP,
1993                                       MF_MDC_CANCEL_FID3);
1994                 if (rc != 0)
1995                         RETURN(rc);
1996
1997                 rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1998                                       LCK_EX, MDS_INODELOCK_FULL,
1999                                       MF_MDC_CANCEL_FID3);
2000                 if (rc != 0)
2001                         RETURN(rc);
2002         }
2003
2004 retry_rename:
2005         /*
2006          * Cancel all the locks on tgt child (fid4).
2007          */
2008         if (fid_is_sane(&op_data->op_fid4)) {
2009                 struct lmv_tgt_desc *tgt;
2010
2011                 rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
2012                                       LCK_EX, MDS_INODELOCK_FULL,
2013                                       MF_MDC_CANCEL_FID4);
2014                 if (rc != 0)
2015                         RETURN(rc);
2016
2017                 tgt = lmv_find_target(lmv, &op_data->op_fid4);
2018                 if (IS_ERR(tgt))
2019                         RETURN(PTR_ERR(tgt));
2020
2021                 /* Since the target child might be destroyed, and it might
2022                  * become orphan, and we can only check orphan on the local
2023                  * MDT right now, so we send rename request to the MDT where
2024                  * target child is located. If target child does not exist,
2025                  * then it will send the request to the target parent */
2026                 target_exp = tgt->ltd_exp;
2027         }
2028
2029         rc = md_rename(target_exp, op_data, old, oldlen, new, newlen,
2030                        request);
2031
2032         if (rc != 0 && rc != -EXDEV)
2033                 RETURN(rc);
2034
2035         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2036         if (body == NULL)
2037                 RETURN(-EPROTO);
2038
2039         /* Not cross-ref case, just get out of here. */
2040         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
2041                 RETURN(rc);
2042
2043         CDEBUG(D_INODE, "%s: try rename to another MDT for "DFID"\n",
2044                exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
2045
2046         op_data->op_fid4 = body->mbo_fid1;
2047         ptlrpc_req_finished(*request);
2048         *request = NULL;
2049         goto retry_rename;
2050 }
2051
2052 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2053                        void *ea, size_t ealen, struct ptlrpc_request **request)
2054 {
2055         struct obd_device       *obd = exp->exp_obd;
2056         struct lmv_obd          *lmv = &obd->u.lmv;
2057         struct lmv_tgt_desc     *tgt;
2058         int                      rc = 0;
2059         ENTRY;
2060
2061         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
2062                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
2063
2064         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2065         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2066         if (IS_ERR(tgt))
2067                 RETURN(PTR_ERR(tgt));
2068
2069         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, request);
2070
2071         RETURN(rc);
2072 }
2073
2074 static int lmv_fsync(struct obd_export *exp, const struct lu_fid *fid,
2075                      struct ptlrpc_request **request)
2076 {
2077         struct obd_device       *obd = exp->exp_obd;
2078         struct lmv_obd          *lmv = &obd->u.lmv;
2079         struct lmv_tgt_desc     *tgt;
2080         int                      rc;
2081         ENTRY;
2082
2083         tgt = lmv_find_target(lmv, fid);
2084         if (IS_ERR(tgt))
2085                 RETURN(PTR_ERR(tgt));
2086
2087         rc = md_fsync(tgt->ltd_exp, fid, request);
2088         RETURN(rc);
2089 }
2090
2091 /**
2092  * Get current minimum entry from striped directory
2093  *
2094  * This function will search the dir entry, whose hash value is the
2095  * closest(>=) to @hash_offset, from all of sub-stripes, and it is
2096  * only being called for striped directory.
2097  *
2098  * \param[in] exp               export of LMV
2099  * \param[in] op_data           parameters transferred beween client MD stack
2100  *                              stripe_information will be included in this
2101  *                              parameter
2102  * \param[in] cb_op             ldlm callback being used in enqueue in
2103  *                              mdc_read_page
2104  * \param[in] hash_offset       the hash value, which is used to locate
2105  *                              minum(closet) dir entry
2106  * \param[in|out] stripe_offset the caller use this to indicate the stripe
2107  *                              index of last entry, so to avoid hash conflict
2108  *                              between stripes. It will also be used to
2109  *                              return the stripe index of current dir entry.
2110  * \param[in|out] entp          the minum entry and it also is being used
2111  *                              to input the last dir entry to resolve the
2112  *                              hash conflict
2113  *
2114  * \param[out] ppage            the page which holds the minum entry
2115  *
2116  * \retval                      = 0 get the entry successfully
2117  *                              negative errno (< 0) does not get the entry
2118  */
2119 static int lmv_get_min_striped_entry(struct obd_export *exp,
2120                                      struct md_op_data *op_data,
2121                                      struct md_callback *cb_op,
2122                                      __u64 hash_offset, int *stripe_offset,
2123                                      struct lu_dirent **entp,
2124                                      struct page **ppage)
2125 {
2126         struct obd_device       *obd = exp->exp_obd;
2127         struct lmv_obd          *lmv = &obd->u.lmv;
2128         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2129         struct lmv_tgt_desc     *tgt;
2130         int                     stripe_count;
2131         struct lu_dirent        *min_ent = NULL;
2132         struct page             *min_page = NULL;
2133         int                     min_idx = 0;
2134         int                     i;
2135         int                     rc = 0;
2136         ENTRY;
2137
2138         stripe_count = lsm->lsm_md_stripe_count;
2139         for (i = 0; i < stripe_count; i++) {
2140                 struct lu_dirent        *ent = NULL;
2141                 struct page             *page = NULL;
2142                 struct lu_dirpage       *dp;
2143                 __u64                   stripe_hash = hash_offset;
2144
2145                 tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL);
2146                 if (IS_ERR(tgt))
2147                         GOTO(out, rc = PTR_ERR(tgt));
2148
2149                 /* op_data will be shared by each stripe, so we need
2150                  * reset these value for each stripe */
2151                 op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
2152                 op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
2153                 op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
2154 next:
2155                 rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash,
2156                                   &page);
2157                 if (rc != 0)
2158                         GOTO(out, rc);
2159
2160                 dp = page_address(page);
2161                 for (ent = lu_dirent_start(dp); ent != NULL;
2162                      ent = lu_dirent_next(ent)) {
2163                         /* Skip dummy entry */
2164                         if (le16_to_cpu(ent->lde_namelen) == 0)
2165                                 continue;
2166
2167                         if (le64_to_cpu(ent->lde_hash) < hash_offset)
2168                                 continue;
2169
2170                         if (le64_to_cpu(ent->lde_hash) == hash_offset &&
2171                             (*entp == ent || i < *stripe_offset))
2172                                 continue;
2173
2174                         /* skip . and .. for other stripes */
2175                         if (i != 0 &&
2176                             (strncmp(ent->lde_name, ".",
2177                                      le16_to_cpu(ent->lde_namelen)) == 0 ||
2178                              strncmp(ent->lde_name, "..",
2179                                      le16_to_cpu(ent->lde_namelen)) == 0))
2180                                 continue;
2181                         break;
2182                 }
2183
2184                 if (ent == NULL) {
2185                         stripe_hash = le64_to_cpu(dp->ldp_hash_end);
2186
2187                         kunmap(page);
2188                         page_cache_release(page);
2189                         page = NULL;
2190
2191                         /* reach the end of current stripe, go to next stripe */
2192                         if (stripe_hash == MDS_DIR_END_OFF)
2193                                 continue;
2194                         else
2195                                 goto next;
2196                 }
2197
2198                 if (min_ent != NULL) {
2199                         if (le64_to_cpu(min_ent->lde_hash) >
2200                             le64_to_cpu(ent->lde_hash)) {
2201                                 min_ent = ent;
2202                                 kunmap(min_page);
2203                                 page_cache_release(min_page);
2204                                 min_idx = i;
2205                                 min_page = page;
2206                         } else {
2207                                 kunmap(page);
2208                                 page_cache_release(page);
2209                                 page = NULL;
2210                         }
2211                 } else {
2212                         min_ent = ent;
2213                         min_page = page;
2214                         min_idx = i;
2215                 }
2216         }
2217
2218 out:
2219         if (*ppage != NULL) {
2220                 kunmap(*ppage);
2221                 page_cache_release(*ppage);
2222         }
2223         *stripe_offset = min_idx;
2224         *entp = min_ent;
2225         *ppage = min_page;
2226         RETURN(rc);
2227 }
2228
2229 /**
2230  * Build dir entry page from a striped directory
2231  *
2232  * This function gets one entry by @offset from a striped directory. It will
2233  * read entries from all of stripes, and choose one closest to the required
2234  * offset(&offset). A few notes
2235  * 1. skip . and .. for non-zero stripes, because there can only have one .
2236  * and .. in a directory.
2237  * 2. op_data will be shared by all of stripes, instead of allocating new
2238  * one, so need to restore before reusing.
2239  * 3. release the entry page if that is not being chosen.
2240  *
2241  * \param[in] exp       obd export refer to LMV
2242  * \param[in] op_data   hold those MD parameters of read_entry
2243  * \param[in] cb_op     ldlm callback being used in enqueue in mdc_read_entry
2244  * \param[out] ldp      the entry being read
2245  * \param[out] ppage    the page holding the entry. Note: because the entry
2246  *                      will be accessed in upper layer, so we need hold the
2247  *                      page until the usages of entry is finished, see
2248  *                      ll_dir_entry_next.
2249  *
2250  * retval               =0 if get entry successfully
2251  *                      <0 cannot get entry
2252  */
2253 static int lmv_read_striped_page(struct obd_export *exp,
2254                                  struct md_op_data *op_data,
2255                                  struct md_callback *cb_op,
2256                                  __u64 offset, struct page **ppage)
2257 {
2258         struct lu_fid           master_fid = op_data->op_fid1;
2259         struct inode            *master_inode = op_data->op_data;
2260         __u64                   hash_offset = offset;
2261         struct lu_dirpage       *dp;
2262         struct page             *min_ent_page = NULL;
2263         struct page             *ent_page = NULL;
2264         struct lu_dirent        *ent;
2265         void                    *area;
2266         int                     ent_idx = 0;
2267         struct lu_dirent        *min_ent = NULL;
2268         struct lu_dirent        *last_ent;
2269         size_t                  left_bytes;
2270         int                     rc;
2271         ENTRY;
2272
2273         /* Allocate a page and read entries from all of stripes and fill
2274          * the page by hash order */
2275         ent_page = alloc_page(GFP_KERNEL);
2276         if (ent_page == NULL)
2277                 RETURN(-ENOMEM);
2278
2279         /* Initialize the entry page */
2280         dp = kmap(ent_page);
2281         memset(dp, 0, sizeof(*dp));
2282         dp->ldp_hash_start = cpu_to_le64(offset);
2283         dp->ldp_flags |= LDF_COLLIDE;
2284
2285         area = dp + 1;
2286         left_bytes = PAGE_CACHE_SIZE - sizeof(*dp);
2287         ent = area;
2288         last_ent = ent;
2289         do {
2290                 __u16   ent_size;
2291
2292                 /* Find the minum entry from all sub-stripes */
2293                 rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset,
2294                                                &ent_idx, &min_ent,
2295                                                &min_ent_page);
2296                 if (rc != 0)
2297                         GOTO(out, rc);
2298
2299                 /* If it can not get minum entry, it means it already reaches
2300                  * the end of this directory */
2301                 if (min_ent == NULL) {
2302                         last_ent->lde_reclen = 0;
2303                         hash_offset = MDS_DIR_END_OFF;
2304                         GOTO(out, rc);
2305                 }
2306
2307                 ent_size = le16_to_cpu(min_ent->lde_reclen);
2308
2309                 /* the last entry lde_reclen is 0, but it might not
2310                  * the end of this entry of this temporay entry */
2311                 if (ent_size == 0)
2312                         ent_size = lu_dirent_calc_size(
2313                                         le16_to_cpu(min_ent->lde_namelen),
2314                                         le32_to_cpu(min_ent->lde_attrs));
2315                 if (ent_size > left_bytes) {
2316                         last_ent->lde_reclen = cpu_to_le16(0);
2317                         hash_offset = le64_to_cpu(min_ent->lde_hash);
2318                         GOTO(out, rc);
2319                 }
2320
2321                 memcpy(ent, min_ent, ent_size);
2322
2323                 /* Replace . with master FID and Replace .. with the parent FID
2324                  * of master object */
2325                 if (strncmp(ent->lde_name, ".",
2326                             le16_to_cpu(ent->lde_namelen)) == 0 &&
2327                     le16_to_cpu(ent->lde_namelen) == 1)
2328                         fid_cpu_to_le(&ent->lde_fid, &master_fid);
2329                 else if (strncmp(ent->lde_name, "..",
2330                                    le16_to_cpu(ent->lde_namelen)) == 0 &&
2331                            le16_to_cpu(ent->lde_namelen) == 2)
2332                         fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
2333
2334                 left_bytes -= ent_size;
2335                 ent->lde_reclen = cpu_to_le16(ent_size);
2336                 last_ent = ent;
2337                 ent = (void *)ent + ent_size;
2338                 hash_offset = le64_to_cpu(min_ent->lde_hash);
2339                 if (hash_offset == MDS_DIR_END_OFF) {
2340                         last_ent->lde_reclen = 0;
2341                         break;
2342                 }
2343         } while (1);
2344 out:
2345         if (min_ent_page != NULL) {
2346                 kunmap(min_ent_page);
2347                 page_cache_release(min_ent_page);
2348         }
2349
2350         if (unlikely(rc != 0)) {
2351                 __free_page(ent_page);
2352                 ent_page = NULL;
2353         } else {
2354                 if (ent == area)
2355                         dp->ldp_flags |= LDF_EMPTY;
2356                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2357                 dp->ldp_hash_end = cpu_to_le64(hash_offset);
2358         }
2359
2360         /* We do not want to allocate md_op_data during each
2361          * dir entry reading, so op_data will be shared by every stripe,
2362          * then we need to restore it back to original value before
2363          * return to the upper layer */
2364         op_data->op_fid1 = master_fid;
2365         op_data->op_fid2 = master_fid;
2366         op_data->op_data = master_inode;
2367
2368         *ppage = ent_page;
2369
2370         RETURN(rc);
2371 }
2372
2373 int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
2374                   struct md_callback *cb_op, __u64 offset,
2375                   struct page **ppage)
2376 {
2377         struct obd_device       *obd = exp->exp_obd;
2378         struct lmv_obd          *lmv = &obd->u.lmv;
2379         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2380         struct lmv_tgt_desc     *tgt;
2381         int                     rc;
2382         ENTRY;
2383
2384         if (unlikely(lsm != NULL)) {
2385                 rc = lmv_read_striped_page(exp, op_data, cb_op, offset, ppage);
2386                 RETURN(rc);
2387         }
2388
2389         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2390         if (IS_ERR(tgt))
2391                 RETURN(PTR_ERR(tgt));
2392
2393         rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage);
2394
2395         RETURN(rc);
2396 }
2397
2398 /**
2399  * Unlink a file/directory
2400  *
2401  * Unlink a file or directory under the parent dir. The unlink request
2402  * usually will be sent to the MDT where the child is located, but if
2403  * the client does not have the child FID then request will be sent to the
2404  * MDT where the parent is located.
2405  *
2406  * If the parent is a striped directory then it also needs to locate which
2407  * stripe the name of the child is located, and replace the parent FID
2408  * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown,
2409  * it will walk through all of sub-stripes until the child is being
2410  * unlinked finally.
2411  *
2412  * \param[in] exp       export refer to LMV
2413  * \param[in] op_data   different parameters transferred beween client
2414  *                      MD stacks, name, namelen, FIDs etc.
2415  *                      op_fid1 is the parent FID, op_fid2 is the child
2416  *                      FID.
2417  * \param[out] request  point to the request of unlink.
2418  *
2419  * retval               0 if succeed
2420  *                      negative errno if failed.
2421  */
2422 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2423                       struct ptlrpc_request **request)
2424 {
2425         struct obd_device       *obd = exp->exp_obd;
2426         struct lmv_obd          *lmv = &obd->u.lmv;
2427         struct lmv_tgt_desc     *tgt = NULL;
2428         struct lmv_tgt_desc     *parent_tgt = NULL;
2429         struct mdt_body         *body;
2430         int                     rc;
2431         int                     stripe_index = 0;
2432         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2433         ENTRY;
2434
2435 retry_unlink:
2436         /* For striped dir, we need to locate the parent as well */
2437         if (lsm != NULL) {
2438                 struct lmv_tgt_desc *tmp;
2439
2440                 LASSERT(op_data->op_name != NULL &&
2441                         op_data->op_namelen != 0);
2442
2443                 tmp = lmv_locate_target_for_name(lmv, lsm,
2444                                                  op_data->op_name,
2445                                                  op_data->op_namelen,
2446                                                  &op_data->op_fid1,
2447                                                  &op_data->op_mds);
2448
2449                 /* return -EBADFD means unknown hash type, might
2450                  * need try all sub-stripe here */
2451                 if (IS_ERR(tmp) && PTR_ERR(tmp) != -EBADFD)
2452                         RETURN(PTR_ERR(tmp));
2453
2454                 /* Note: both migrating dir and unknown hash dir need to
2455                  * try all of sub-stripes, so we need start search the
2456                  * name from stripe 0, but migrating dir is already handled
2457                  * inside lmv_locate_target_for_name(), so we only check
2458                  * unknown hash type directory here */
2459                 if (!lmv_is_known_hash_type(lsm->lsm_md_hash_type)) {
2460                         struct lmv_oinfo *oinfo;
2461
2462                         oinfo = &lsm->lsm_md_oinfo[stripe_index];
2463
2464                         op_data->op_fid1 = oinfo->lmo_fid;
2465                         op_data->op_mds = oinfo->lmo_mds;
2466                 }
2467         }
2468
2469 try_next_stripe:
2470         /* Send unlink requests to the MDT where the child is located */
2471         if (likely(!fid_is_zero(&op_data->op_fid2)))
2472                 tgt = lmv_find_target(lmv, &op_data->op_fid2);
2473         else if (lsm != NULL)
2474                 tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
2475         else
2476                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2477
2478         if (IS_ERR(tgt))
2479                 RETURN(PTR_ERR(tgt));
2480
2481         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2482         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2483         op_data->op_cap = cfs_curproc_cap_pack();
2484
2485         /*
2486          * If child's fid is given, cancel unused locks for it if it is from
2487          * another export than parent.
2488          *
2489          * LOOKUP lock for child (fid3) should also be cancelled on parent
2490          * tgt_tgt in mdc_unlink().
2491          */
2492         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2493
2494         /*
2495          * Cancel FULL locks on child (fid3).
2496          */
2497         parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
2498         if (IS_ERR(parent_tgt))
2499                 RETURN(PTR_ERR(parent_tgt));
2500
2501         if (parent_tgt != tgt) {
2502                 rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx,
2503                                       LCK_EX, MDS_INODELOCK_LOOKUP,
2504                                       MF_MDC_CANCEL_FID3);
2505         }
2506
2507         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
2508                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2509         if (rc != 0)
2510                 RETURN(rc);
2511
2512         CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%u\n",
2513                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2514
2515         rc = md_unlink(tgt->ltd_exp, op_data, request);
2516         if (rc != 0 && rc != -EREMOTE && rc != -ENOENT)
2517                 RETURN(rc);
2518
2519         /* Try next stripe if it is needed. */
2520         if (rc == -ENOENT && lsm != NULL && lmv_need_try_all_stripes(lsm)) {
2521                 struct lmv_oinfo *oinfo;
2522
2523                 stripe_index++;
2524                 if (stripe_index >= lsm->lsm_md_stripe_count)
2525                         RETURN(rc);
2526
2527                 oinfo = &lsm->lsm_md_oinfo[stripe_index];
2528
2529                 op_data->op_fid1 = oinfo->lmo_fid;
2530                 op_data->op_mds = oinfo->lmo_mds;
2531
2532                 ptlrpc_req_finished(*request);
2533                 *request = NULL;
2534
2535                 goto try_next_stripe;
2536         }
2537
2538         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2539         if (body == NULL)
2540                 RETURN(-EPROTO);
2541
2542         /* Not cross-ref case, just get out of here. */
2543         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
2544                 RETURN(rc);
2545
2546         CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2547                exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
2548
2549         /* This is a remote object, try remote MDT, Note: it may
2550          * try more than 1 time here, Considering following case
2551          * /mnt/lustre is root on MDT0, remote1 is on MDT1
2552          * 1. Initially A does not know where remote1 is, it send
2553          *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2554          *    resend unlink RPC to MDT1 (retry 1st time).
2555          *
2556          * 2. During the unlink RPC in flight,
2557          *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2558          *    and create new remote1, but on MDT0
2559          *
2560          * 3. MDT1 get unlink RPC(from A), then do remote lock on
2561          *    /mnt/lustre, then lookup get fid of remote1, and find
2562          *    it is remote dir again, and replay -EREMOTE again.
2563          *
2564          * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2565          *
2566          * In theory, it might try unlimited time here, but it should
2567          * be very rare case.  */
2568         op_data->op_fid2 = body->mbo_fid1;
2569         ptlrpc_req_finished(*request);
2570         *request = NULL;
2571
2572         goto retry_unlink;
2573 }
2574
2575 static int lmv_precleanup(struct obd_device *obd)
2576 {
2577         ENTRY;
2578         fld_client_proc_fini(&obd->u.lmv.lmv_fld);
2579         lprocfs_obd_cleanup(obd);
2580         lprocfs_free_md_stats(obd);
2581         RETURN(0);
2582 }
2583
2584 /**
2585  * Get by key a value associated with a LMV device.
2586  *
2587  * Dispatch request to lower-layer devices as needed.
2588  *
2589  * \param[in] env               execution environment for this thread
2590  * \param[in] exp               export for the LMV device
2591  * \param[in] keylen            length of key identifier
2592  * \param[in] key               identifier of key to get value for
2593  * \param[in] vallen            size of \a val
2594  * \param[out] val              pointer to storage location for value
2595  * \param[in] lsm               optional striping metadata of object
2596  *
2597  * \retval 0            on success
2598  * \retval negative     negated errno on failure
2599  */
2600 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2601                         __u32 keylen, void *key, __u32 *vallen, void *val)
2602 {
2603         struct obd_device       *obd;
2604         struct lmv_obd          *lmv;
2605         int                      rc = 0;
2606         ENTRY;
2607
2608         obd = class_exp2obd(exp);
2609         if (obd == NULL) {
2610                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2611                        exp->exp_handle.h_cookie);
2612                 RETURN(-EINVAL);
2613         }
2614
2615         lmv = &obd->u.lmv;
2616         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2617                 int i;
2618
2619                 LASSERT(*vallen == sizeof(__u32));
2620                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2621                         struct lmv_tgt_desc *tgt = lmv->tgts[i];
2622                         /*
2623                          * All tgts should be connected when this gets called.
2624                          */
2625                         if (tgt == NULL || tgt->ltd_exp == NULL)
2626                                 continue;
2627
2628                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2629                                           vallen, val))
2630                                 RETURN(0);
2631                 }
2632                 RETURN(-EINVAL);
2633         } else if (KEY_IS(KEY_MAX_EASIZE) ||
2634                    KEY_IS(KEY_DEFAULT_EASIZE) ||
2635                    KEY_IS(KEY_CONN_DATA)) {
2636                 /*
2637                  * Forwarding this request to first MDS, it should know LOV
2638                  * desc.
2639                  */
2640                 rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2641                                   vallen, val);
2642                 if (!rc && KEY_IS(KEY_CONN_DATA))
2643                         exp->exp_connect_data = *(struct obd_connect_data *)val;
2644                 RETURN(rc);
2645         } else if (KEY_IS(KEY_TGT_COUNT)) {
2646                 *((int *)val) = lmv->desc.ld_tgt_count;
2647                 RETURN(0);
2648         }
2649
2650         CDEBUG(D_IOCTL, "Invalid key\n");
2651         RETURN(-EINVAL);
2652 }
2653
2654 /**
2655  * Asynchronously set by key a value associated with a LMV device.
2656  *
2657  * Dispatch request to lower-layer devices as needed.
2658  *
2659  * \param[in] env       execution environment for this thread
2660  * \param[in] exp       export for the LMV device
2661  * \param[in] keylen    length of key identifier
2662  * \param[in] key       identifier of key to store value for
2663  * \param[in] vallen    size of value to store
2664  * \param[in] val       pointer to data to be stored
2665  * \param[in] set       optional list of related ptlrpc requests
2666  *
2667  * \retval 0            on success
2668  * \retval negative     negated errno on failure
2669  */
2670 int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2671                         __u32 keylen, void *key, __u32 vallen, void *val,
2672                         struct ptlrpc_request_set *set)
2673 {
2674         struct lmv_tgt_desc     *tgt = NULL;
2675         struct obd_device       *obd;
2676         struct lmv_obd          *lmv;
2677         int rc = 0;
2678         ENTRY;
2679
2680         obd = class_exp2obd(exp);
2681         if (obd == NULL) {
2682                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2683                        exp->exp_handle.h_cookie);
2684                 RETURN(-EINVAL);
2685         }
2686         lmv = &obd->u.lmv;
2687
2688         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) ||
2689             KEY_IS(KEY_DEFAULT_EASIZE)) {
2690                 int i, err = 0;
2691
2692                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2693                         tgt = lmv->tgts[i];
2694
2695                         if (tgt == NULL || tgt->ltd_exp == NULL)
2696                                 continue;
2697
2698                         err = obd_set_info_async(env, tgt->ltd_exp,
2699                                                  keylen, key, vallen, val, set);
2700                         if (err && rc == 0)
2701                                 rc = err;
2702                 }
2703
2704                 RETURN(rc);
2705         }
2706
2707         RETURN(-EINVAL);
2708 }
2709
2710 static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
2711                             const struct lmv_mds_md_v1 *lmm1)
2712 {
2713         struct lmv_obd  *lmv = &exp->exp_obd->u.lmv;
2714         int             stripe_count;
2715         int             cplen;
2716         int             i;
2717         int             rc = 0;
2718         ENTRY;
2719
2720         lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic);
2721         lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2722         lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index);
2723         if (OBD_FAIL_CHECK(OBD_FAIL_UNKNOWN_LMV_STRIPE))
2724                 lsm->lsm_md_hash_type = LMV_HASH_TYPE_UNKNOWN;
2725         else
2726                 lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
2727         lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
2728         cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
2729                         sizeof(lsm->lsm_md_pool_name));
2730
2731         if (cplen >= sizeof(lsm->lsm_md_pool_name))
2732                 RETURN(-E2BIG);
2733
2734         CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d"
2735                "layout_version %d\n", lsm->lsm_md_stripe_count,
2736                lsm->lsm_md_master_mdt_index, lsm->lsm_md_hash_type,
2737                lsm->lsm_md_layout_version);
2738
2739         stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2740         for (i = 0; i < stripe_count; i++) {
2741                 fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid,
2742                               &lmm1->lmv_stripe_fids[i]);
2743                 rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid,
2744                                     &lsm->lsm_md_oinfo[i].lmo_mds);
2745                 if (rc != 0)
2746                         RETURN(rc);
2747                 CDEBUG(D_INFO, "unpack fid #%d "DFID"\n", i,
2748                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid));
2749         }
2750
2751         RETURN(rc);
2752 }
2753
2754 static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp,
2755                         const union lmv_mds_md *lmm, size_t lmm_size)
2756 {
2757         struct lmv_stripe_md     *lsm;
2758         int                      lsm_size;
2759         int                      rc;
2760         bool                     allocated = false;
2761         ENTRY;
2762
2763         LASSERT(lsmp != NULL);
2764
2765         lsm = *lsmp;
2766         /* Free memmd */
2767         if (lsm != NULL && lmm == NULL) {
2768                 int i;
2769                 for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
2770                         /* For migrating inode, the master stripe and master
2771                          * object will be the same, so do not need iput, see
2772                          * ll_update_lsm_md */
2773                         if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION &&
2774                               i == 0) && lsm->lsm_md_oinfo[i].lmo_root != NULL)
2775                                 iput(lsm->lsm_md_oinfo[i].lmo_root);
2776                 }
2777                 lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
2778                 OBD_FREE(lsm, lsm_size);
2779                 *lsmp = NULL;
2780                 RETURN(0);
2781         }
2782
2783         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE)
2784                 RETURN(-EPERM);
2785
2786         /* Unpack memmd */
2787         if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
2788             le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
2789                 CERROR("%s: invalid lmv magic %x: rc = %d\n",
2790                        exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic),
2791                        -EIO);
2792                 RETURN(-EIO);
2793         }
2794
2795         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1)
2796                 lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
2797         else
2798                 /**
2799                  * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md,
2800                  * stripecount should be 0 then.
2801                  */
2802                 lsm_size = lmv_stripe_md_size(0);
2803
2804         lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
2805         if (lsm == NULL) {
2806                 OBD_ALLOC(lsm, lsm_size);
2807                 if (lsm == NULL)
2808                         RETURN(-ENOMEM);
2809                 allocated = true;
2810                 *lsmp = lsm;
2811         }
2812
2813         switch (le32_to_cpu(lmm->lmv_magic)) {
2814         case LMV_MAGIC_V1:
2815                 rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
2816                 break;
2817         default:
2818                 CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name,
2819                        le32_to_cpu(lmm->lmv_magic));
2820                 rc = -EINVAL;
2821                 break;
2822         }
2823
2824         if (rc != 0 && allocated) {
2825                 OBD_FREE(lsm, lsm_size);
2826                 *lsmp = NULL;
2827                 lsm_size = rc;
2828         }
2829         RETURN(lsm_size);
2830 }
2831
2832 void lmv_free_memmd(struct lmv_stripe_md *lsm)
2833 {
2834         lmv_unpackmd(NULL, &lsm, NULL, 0);
2835 }
2836 EXPORT_SYMBOL(lmv_free_memmd);
2837
2838 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2839                              union ldlm_policy_data *policy,
2840                              enum ldlm_mode mode, enum ldlm_cancel_flags flags,
2841                              void *opaque)
2842 {
2843         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2844         int rc = 0;
2845         __u32 i;
2846         ENTRY;
2847
2848         LASSERT(fid != NULL);
2849
2850         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2851                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
2852                 int err;
2853
2854                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
2855                         continue;
2856
2857                 err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags,
2858                                        opaque);
2859                 if (!rc)
2860                         rc = err;
2861         }
2862         RETURN(rc);
2863 }
2864
2865 static int lmv_set_lock_data(struct obd_export *exp,
2866                              const struct lustre_handle *lockh,
2867                              void *data, __u64 *bits)
2868 {
2869         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
2870         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2871         int                      rc;
2872         ENTRY;
2873
2874         if (tgt == NULL || tgt->ltd_exp == NULL)
2875                 RETURN(-EINVAL);
2876         rc =  md_set_lock_data(tgt->ltd_exp, lockh, data, bits);
2877         RETURN(rc);
2878 }
2879
2880 enum ldlm_mode lmv_lock_match(struct obd_export *exp, __u64 flags,
2881                               const struct lu_fid *fid, enum ldlm_type type,
2882                               union ldlm_policy_data *policy,
2883                               enum ldlm_mode mode, struct lustre_handle *lockh)
2884 {
2885         struct obd_device       *obd = exp->exp_obd;
2886         struct lmv_obd          *lmv = &obd->u.lmv;
2887         enum ldlm_mode          rc;
2888         int                     tgt;
2889         int                     i;
2890         ENTRY;
2891
2892         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2893
2894         /*
2895          * With DNE every object can have two locks in different namespaces:
2896          * lookup lock in space of MDT storing direntry and update/open lock in
2897          * space of MDT storing inode.  Try the MDT that the FID maps to first,
2898          * since this can be easily found, and only try others if that fails.
2899          */
2900         for (i = 0, tgt = lmv_find_target_index(lmv, fid);
2901              i < lmv->desc.ld_tgt_count;
2902              i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) {
2903                 if (tgt < 0) {
2904                         CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n",
2905                                obd->obd_name, PFID(fid), tgt);
2906                         tgt = 0;
2907                 }
2908
2909                 if (lmv->tgts[tgt] == NULL ||
2910                     lmv->tgts[tgt]->ltd_exp == NULL ||
2911                     lmv->tgts[tgt]->ltd_active == 0)
2912                         continue;
2913
2914                 rc = md_lock_match(lmv->tgts[tgt]->ltd_exp, flags, fid,
2915                                    type, policy, mode, lockh);
2916                 if (rc)
2917                         RETURN(rc);
2918         }
2919
2920         RETURN(0);
2921 }
2922
2923 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2924                       struct obd_export *dt_exp, struct obd_export *md_exp,
2925                       struct lustre_md *md)
2926 {
2927         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
2928         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2929
2930         if (tgt == NULL || tgt->ltd_exp == NULL)
2931                 RETURN(-EINVAL);
2932
2933         return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
2934 }
2935
2936 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2937 {
2938         struct obd_device       *obd = exp->exp_obd;
2939         struct lmv_obd          *lmv = &obd->u.lmv;
2940         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2941         ENTRY;
2942
2943         if (md->lmv != NULL) {
2944                 lmv_free_memmd(md->lmv);
2945                 md->lmv = NULL;
2946         }
2947         if (tgt == NULL || tgt->ltd_exp == NULL)
2948                 RETURN(-EINVAL);
2949         RETURN(md_free_lustre_md(lmv->tgts[0]->ltd_exp, md));
2950 }
2951
2952 int lmv_set_open_replay_data(struct obd_export *exp,
2953                              struct obd_client_handle *och,
2954                              struct lookup_intent *it)
2955 {
2956         struct obd_device       *obd = exp->exp_obd;
2957         struct lmv_obd          *lmv = &obd->u.lmv;
2958         struct lmv_tgt_desc     *tgt;
2959         ENTRY;
2960
2961         tgt = lmv_find_target(lmv, &och->och_fid);
2962         if (IS_ERR(tgt))
2963                 RETURN(PTR_ERR(tgt));
2964
2965         RETURN(md_set_open_replay_data(tgt->ltd_exp, och, it));
2966 }
2967
2968 int lmv_clear_open_replay_data(struct obd_export *exp,
2969                                struct obd_client_handle *och)
2970 {
2971         struct obd_device       *obd = exp->exp_obd;
2972         struct lmv_obd          *lmv = &obd->u.lmv;
2973         struct lmv_tgt_desc     *tgt;
2974         ENTRY;
2975
2976         tgt = lmv_find_target(lmv, &och->och_fid);
2977         if (IS_ERR(tgt))
2978                 RETURN(PTR_ERR(tgt));
2979
2980         RETURN(md_clear_open_replay_data(tgt->ltd_exp, och));
2981 }
2982
2983 int lmv_intent_getattr_async(struct obd_export *exp,
2984                              struct md_enqueue_info *minfo)
2985 {
2986         struct md_op_data       *op_data = &minfo->mi_data;
2987         struct obd_device       *obd = exp->exp_obd;
2988         struct lmv_obd          *lmv = &obd->u.lmv;
2989         struct lmv_tgt_desc     *ptgt = NULL;
2990         struct lmv_tgt_desc     *ctgt = NULL;
2991         int                      rc;
2992         ENTRY;
2993
2994         if (!fid_is_sane(&op_data->op_fid2))
2995                 RETURN(-EINVAL);
2996
2997         ptgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2998         if (IS_ERR(ptgt))
2999                 RETURN(PTR_ERR(ptgt));
3000
3001         ctgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
3002         if (IS_ERR(ctgt))
3003                 RETURN(PTR_ERR(ctgt));
3004
3005         /*
3006          * if child is on remote MDT, we need 2 async RPCs to fetch both LOOKUP
3007          * lock on parent, and UPDATE lock on child MDT, which makes all
3008          * complicated. Considering remote dir is rare case, and not supporting
3009          * it in statahead won't cause any issue, drop its support for now.
3010          */
3011         if (ptgt != ctgt)
3012                 RETURN(-ENOTSUPP);
3013
3014         rc = md_intent_getattr_async(ptgt->ltd_exp, minfo);
3015         RETURN(rc);
3016 }
3017
3018 int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
3019                         struct lu_fid *fid, __u64 *bits)
3020 {
3021         struct obd_device       *obd = exp->exp_obd;
3022         struct lmv_obd          *lmv = &obd->u.lmv;
3023         struct lmv_tgt_desc     *tgt;
3024         int                      rc;
3025         ENTRY;
3026
3027         tgt = lmv_find_target(lmv, fid);
3028         if (IS_ERR(tgt))
3029                 RETURN(PTR_ERR(tgt));
3030
3031         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
3032         RETURN(rc);
3033 }
3034
3035 int lmv_get_fid_from_lsm(struct obd_export *exp,
3036                          const struct lmv_stripe_md *lsm,
3037                          const char *name, int namelen, struct lu_fid *fid)
3038 {
3039         const struct lmv_oinfo *oinfo;
3040
3041         LASSERT(lsm != NULL);
3042         oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
3043         if (IS_ERR(oinfo))
3044                 return PTR_ERR(oinfo);
3045
3046         *fid = oinfo->lmo_fid;
3047
3048         RETURN(0);
3049 }
3050
3051 /**
3052  * For lmv, only need to send request to master MDT, and the master MDT will
3053  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
3054  * we directly fetch data from the slave MDTs.
3055  */
3056 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
3057                  struct obd_quotactl *oqctl)
3058 {
3059         struct obd_device   *obd = class_exp2obd(exp);
3060         struct lmv_obd      *lmv = &obd->u.lmv;
3061         struct lmv_tgt_desc *tgt = lmv->tgts[0];
3062         int                  rc = 0;
3063         __u32                i;
3064         __u64                curspace, curinodes;
3065         ENTRY;
3066
3067         if (tgt == NULL ||
3068             tgt->ltd_exp == NULL ||
3069             !tgt->ltd_active ||
3070             lmv->desc.ld_tgt_count == 0) {
3071                 CERROR("master lmv inactive\n");
3072                 RETURN(-EIO);
3073         }
3074
3075         if (oqctl->qc_cmd != Q_GETOQUOTA) {
3076                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
3077                 RETURN(rc);
3078         }
3079
3080         curspace = curinodes = 0;
3081         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
3082                 int err;
3083                 tgt = lmv->tgts[i];
3084
3085                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
3086                         continue;
3087
3088                 err = obd_quotactl(tgt->ltd_exp, oqctl);
3089                 if (err) {
3090                         CERROR("getquota on mdt %d failed. %d\n", i, err);
3091                         if (!rc)
3092                                 rc = err;
3093                 } else {
3094                         curspace += oqctl->qc_dqblk.dqb_curspace;
3095                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
3096                 }
3097         }
3098         oqctl->qc_dqblk.dqb_curspace = curspace;
3099         oqctl->qc_dqblk.dqb_curinodes = curinodes;
3100
3101         RETURN(rc);
3102 }
3103
3104 static int lmv_merge_attr(struct obd_export *exp,
3105                           const struct lmv_stripe_md *lsm,
3106                           struct cl_attr *attr,
3107                           ldlm_blocking_callback cb_blocking)
3108 {
3109         int rc;
3110         int i;
3111
3112         rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0);
3113         if (rc < 0)
3114                 return rc;
3115
3116         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
3117                 struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
3118
3119                 CDEBUG(D_INFO, ""DFID" size %llu, blocks %llu nlink %u,"
3120                        " atime %lu ctime %lu, mtime %lu.\n",
3121                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
3122                        i_size_read(inode), (unsigned long long)inode->i_blocks,
3123                        inode->i_nlink, LTIME_S(inode->i_atime),
3124                        LTIME_S(inode->i_ctime), LTIME_S(inode->i_mtime));
3125
3126                 /* for slave stripe, it needs to subtract nlink for . and .. */
3127                 if (i != 0)
3128                         attr->cat_nlink += inode->i_nlink - 2;
3129                 else
3130                         attr->cat_nlink = inode->i_nlink;
3131
3132                 attr->cat_size += i_size_read(inode);
3133                 attr->cat_blocks += inode->i_blocks;
3134
3135                 if (attr->cat_atime < LTIME_S(inode->i_atime))
3136                         attr->cat_atime = LTIME_S(inode->i_atime);
3137
3138                 if (attr->cat_ctime < LTIME_S(inode->i_ctime))
3139                         attr->cat_ctime = LTIME_S(inode->i_ctime);
3140
3141                 if (attr->cat_mtime < LTIME_S(inode->i_mtime))
3142                         attr->cat_mtime = LTIME_S(inode->i_mtime);
3143         }
3144         return 0;
3145 }
3146
3147 struct obd_ops lmv_obd_ops = {
3148         .o_owner                = THIS_MODULE,
3149         .o_setup                = lmv_setup,
3150         .o_cleanup              = lmv_cleanup,
3151         .o_precleanup           = lmv_precleanup,
3152         .o_process_config       = lmv_process_config,
3153         .o_connect              = lmv_connect,
3154         .o_disconnect           = lmv_disconnect,
3155         .o_statfs               = lmv_statfs,
3156         .o_get_info             = lmv_get_info,
3157         .o_set_info_async       = lmv_set_info_async,
3158         .o_notify               = lmv_notify,
3159         .o_get_uuid             = lmv_get_uuid,
3160         .o_iocontrol            = lmv_iocontrol,
3161         .o_quotactl             = lmv_quotactl
3162 };
3163
3164 struct md_ops lmv_md_ops = {
3165         .m_get_root             = lmv_get_root,
3166         .m_null_inode           = lmv_null_inode,
3167         .m_close                = lmv_close,
3168         .m_create               = lmv_create,
3169         .m_enqueue              = lmv_enqueue,
3170         .m_getattr              = lmv_getattr,
3171         .m_getxattr             = lmv_getxattr,
3172         .m_getattr_name         = lmv_getattr_name,
3173         .m_intent_lock          = lmv_intent_lock,
3174         .m_link                 = lmv_link,
3175         .m_rename               = lmv_rename,
3176         .m_setattr              = lmv_setattr,
3177         .m_setxattr             = lmv_setxattr,
3178         .m_fsync                = lmv_fsync,
3179         .m_read_page            = lmv_read_page,
3180         .m_unlink               = lmv_unlink,
3181         .m_init_ea_size         = lmv_init_ea_size,
3182         .m_cancel_unused        = lmv_cancel_unused,
3183         .m_set_lock_data        = lmv_set_lock_data,
3184         .m_lock_match           = lmv_lock_match,
3185         .m_get_lustre_md        = lmv_get_lustre_md,
3186         .m_free_lustre_md       = lmv_free_lustre_md,
3187         .m_merge_attr           = lmv_merge_attr,
3188         .m_set_open_replay_data = lmv_set_open_replay_data,
3189         .m_clear_open_replay_data = lmv_clear_open_replay_data,
3190         .m_intent_getattr_async = lmv_intent_getattr_async,
3191         .m_revalidate_lock      = lmv_revalidate_lock,
3192         .m_get_fid_from_lsm     = lmv_get_fid_from_lsm,
3193