Whamcloud - gitweb
LU-8998 clio: getstripe support comp layout
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Implementation of cl_object for LOV layer.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LOV
39
40 #include "lov_cl_internal.h"
41
42 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
43 {
44         return lu2lov_dev(obj->lo_cl.co_lu.lo_dev);
45 }
46
47 /** \addtogroup lov
48  *  @{
49  */
50
51 /*****************************************************************************
52  *
53  * Layout operations.
54  *
55  */
56
57 struct lov_layout_operations {
58         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
59                         struct lov_object *lov, struct lov_stripe_md *lsm,
60                         const struct cl_object_conf *conf,
61                         union lov_layout_state *state);
62         int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
63                            union lov_layout_state *state);
64         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
65                          union lov_layout_state *state);
66         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
67                             union lov_layout_state *state);
68         int  (*llo_print)(const struct lu_env *env, void *cookie,
69                           lu_printer_t p, const struct lu_object *o);
70         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
71                               struct cl_page *page, pgoff_t index);
72         int  (*llo_lock_init)(const struct lu_env *env,
73                               struct cl_object *obj, struct cl_lock *lock,
74                               const struct cl_io *io);
75         int  (*llo_io_init)(const struct lu_env *env,
76                             struct cl_object *obj, struct cl_io *io);
77         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
78                             struct cl_attr *attr);
79 };
80
81 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
82
83 static void lov_lsm_put(struct lov_stripe_md *lsm)
84 {
85         if (lsm != NULL)
86                 lov_free_memmd(&lsm);
87 }
88
89 /*****************************************************************************
90  *
91  * Lov object layout operations.
92  *
93  */
94
95 static void lov_install_empty(const struct lu_env *env,
96                               struct lov_object *lov,
97                               union  lov_layout_state *state)
98 {
99         /*
100          * File without objects.
101          */
102 }
103
104 static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
105                           struct lov_object *lov, struct lov_stripe_md *lsm,
106                           const struct cl_object_conf *conf,
107                           union lov_layout_state *state)
108 {
109         return 0;
110 }
111
112 static void lov_install_composite(const struct lu_env *env,
113                                   struct lov_object *lov,
114                                   union  lov_layout_state *state)
115 {
116 }
117
118 static struct cl_object *lov_sub_find(const struct lu_env *env,
119                                       struct cl_device *dev,
120                                       const struct lu_fid *fid,
121                                       const struct cl_object_conf *conf)
122 {
123         struct lu_object *o;
124
125         ENTRY;
126         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
127         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
128         RETURN(lu2cl(o));
129 }
130
131 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
132                         struct cl_object *subobj, struct lov_layout_raid0 *r0,
133                         struct lov_oinfo *oinfo, int idx)
134 {
135         struct cl_object_header *hdr;
136         struct cl_object_header *subhdr;
137         struct cl_object_header *parent;
138         int entry = lov_comp_entry(idx);
139         int stripe = lov_comp_stripe(idx);
140         int result;
141
142         if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
143                 /* For sanity:test_206.
144                  * Do not leave the object in cache to avoid accessing
145                  * freed memory. This is because osc_object is referring to
146                  * lov_oinfo of lsm_stripe_data which will be freed due to
147                  * this failure. */
148                 cl_object_kill(env, subobj);
149                 cl_object_put(env, subobj);
150                 return -EIO;
151         }
152
153         hdr    = cl_object_header(lov2cl(lov));
154         subhdr = cl_object_header(subobj);
155
156         CDEBUG(D_INODE, DFID"@%p[%d:%d] -> "DFID"@%p: ostid: "DOSTID
157                " ost idx: %d gen: %d\n",
158                PFID(lu_object_fid(&subobj->co_lu)), subhdr, entry, stripe,
159                PFID(lu_object_fid(lov2lu(lov))), hdr, POSTID(&oinfo->loi_oi),
160                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
161
162         /* reuse ->coh_attr_guard to protect coh_parent change */
163         spin_lock(&subhdr->coh_attr_guard);
164         parent = subhdr->coh_parent;
165         if (parent == NULL) {
166                 subhdr->coh_parent = hdr;
167                 spin_unlock(&subhdr->coh_attr_guard);
168                 subhdr->coh_nesting = hdr->coh_nesting + 1;
169                 lu_object_ref_add(&subobj->co_lu, "lov-parent", lov);
170                 r0->lo_sub[stripe] = cl2lovsub(subobj);
171                 r0->lo_sub[stripe]->lso_super = lov;
172                 r0->lo_sub[stripe]->lso_index = idx;
173                 result = 0;
174         } else {
175                 struct lu_object  *old_obj;
176                 struct lov_object *old_lov;
177                 unsigned int mask = D_INODE;
178
179                 spin_unlock(&subhdr->coh_attr_guard);
180                 old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
181                 LASSERT(old_obj != NULL);
182                 old_lov = cl2lov(lu2cl(old_obj));
183                 if (old_lov->lo_layout_invalid) {
184                         /* the object's layout has already changed but isn't
185                          * refreshed */
186                         lu_object_unhash(env, &subobj->co_lu);
187                         result = -EAGAIN;
188                 } else {
189                         mask = D_ERROR;
190                         result = -EIO;
191                 }
192
193                 LU_OBJECT_DEBUG(mask, env, &subobj->co_lu,
194                                 "stripe %d is already owned.", idx);
195                 LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
196                 LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
197                 cl_object_put(env, subobj);
198         }
199         return result;
200 }
201
202 static int lov_page_slice_fixup(struct lov_object *lov,
203                                 struct cl_object *stripe)
204 {
205         struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
206         struct cl_object *o;
207
208         if (stripe == NULL)
209                 return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
210                        cfs_size_round(sizeof(struct lov_page));
211
212         cl_object_for_each(o, stripe)
213                 o->co_slice_off += hdr->coh_page_bufsize;
214
215         return cl_object_header(stripe)->coh_page_bufsize;
216 }
217
218 static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
219                           struct lov_object *lov, int index,
220                           struct lov_layout_raid0 *r0)
221 {
222         struct lov_thread_info  *lti     = lov_env_info(env);
223         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
224         struct lu_fid           *ofid    = &lti->lti_fid;
225         struct cl_object        *stripe;
226         struct lov_stripe_md_entry *lse  = lov_lse(lov, index);
227         int result;
228         int psz;
229         int i;
230
231         ENTRY;
232
233         spin_lock_init(&r0->lo_sub_lock);
234         r0->lo_nr = lse->lsme_stripe_count;
235         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
236
237         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
238         if (r0->lo_sub == NULL)
239                 GOTO(out, result = -ENOMEM);
240
241         psz = 0;
242         result = 0;
243         memset(subconf, 0, sizeof(*subconf));
244
245         /*
246          * Create stripe cl_objects.
247          */
248         for (i = 0; i < r0->lo_nr; ++i) {
249                 struct cl_device *subdev;
250                 struct lov_oinfo *oinfo = lse->lsme_oinfo[i];
251                 int ost_idx = oinfo->loi_ost_idx;
252
253                 if (lov_oinfo_is_dummy(oinfo))
254                         continue;
255
256                 result = ostid_to_fid(ofid, &oinfo->loi_oi, oinfo->loi_ost_idx);
257                 if (result != 0)
258                         GOTO(out, result);
259
260                 if (dev->ld_target[ost_idx] == NULL) {
261                         CERROR("%s: OST %04x is not initialized\n",
262                                lov2obd(dev->ld_lov)->obd_name, ost_idx);
263                         GOTO(out, result = -EIO);
264                 }
265
266                 subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
267                 subconf->u.coc_oinfo = oinfo;
268                 LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
269                 /* In the function below, .hs_keycmp resolves to
270                  * lu_obj_hop_keycmp() */
271                 /* coverity[overrun-buffer-val] */
272                 stripe = lov_sub_find(env, subdev, ofid, subconf);
273                 if (IS_ERR(stripe))
274                         GOTO(out, result = PTR_ERR(stripe));
275
276                 result = lov_init_sub(env, lov, stripe, r0, oinfo,
277                                       lov_comp_index(index, i));
278                 if (result == -EAGAIN) { /* try again */
279                         --i;
280                         result = 0;
281                         continue;
282                 }
283
284                 if (result == 0) {
285                         int sz = lov_page_slice_fixup(lov, stripe);
286                         LASSERT(ergo(psz > 0, psz == sz));
287                         psz = sz;
288                 }
289         }
290         if (result == 0)
291                 result = psz;
292 out:
293         RETURN(result);
294 }
295
296 static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
297                               struct lov_object *lov, struct lov_stripe_md *lsm,
298                               const struct cl_object_conf *conf,
299                               union lov_layout_state *state)
300 {
301         struct lov_layout_composite *comp = &state->composite;
302         unsigned int entry_count;
303         unsigned int psz = 0;
304         int result = 0;
305         int i;
306
307         ENTRY;
308
309         LASSERT(lsm->lsm_entry_count > 0);
310         LASSERT(lov->lo_lsm == NULL);
311         lov->lo_lsm = lsm_addref(lsm);
312         lov->lo_layout_invalid = true;
313
314         entry_count = lsm->lsm_entry_count;
315         comp->lo_entry_count = entry_count;
316
317         OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries));
318         if (comp->lo_entries == NULL)
319                 RETURN(-ENOMEM);
320
321         for (i = 0; i < entry_count; i++) {
322                 struct lov_layout_entry *le = &comp->lo_entries[i];
323
324                 le->lle_extent = lsm->lsm_entries[i]->lsme_extent;
325                 result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0);
326                 if (result < 0)
327                         break;
328
329                 LASSERT(ergo(psz > 0, psz == result));
330                 psz = result;
331         }
332         if (psz > 0)
333                 cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
334
335         return result > 0 ? 0 : result;
336 }
337
338 static int lov_init_released(const struct lu_env *env,
339                              struct lov_device *dev, struct lov_object *lov,
340                              struct lov_stripe_md *lsm,
341                              const struct cl_object_conf *conf,
342                              union lov_layout_state *state)
343 {
344         LASSERT(lsm != NULL);
345         LASSERT(lsm->lsm_is_released);
346         LASSERT(lov->lo_lsm == NULL);
347
348         lov->lo_lsm = lsm_addref(lsm);
349         return 0;
350 }
351
352 static struct cl_object *lov_find_subobj(const struct lu_env *env,
353                                          struct lov_object *lov,
354                                          struct lov_stripe_md *lsm,
355                                          int index)
356 {
357         struct lov_device       *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
358         struct lov_thread_info  *lti = lov_env_info(env);
359         struct lu_fid           *ofid = &lti->lti_fid;
360         struct lov_oinfo        *oinfo;
361         struct cl_device        *subdev;
362         int                     entry = lov_comp_entry(index);
363         int                     stripe = lov_comp_stripe(index);
364         int                     ost_idx;
365         int                     rc;
366         struct cl_object        *result;
367
368         if (lov->lo_type != LLT_COMP)
369                 GOTO(out, result = NULL);
370
371         if (entry >= lsm->lsm_entry_count ||
372             stripe >= lsm->lsm_entries[entry]->lsme_stripe_count)
373                 GOTO(out, result = NULL);
374
375         oinfo = lsm->lsm_entries[entry]->lsme_oinfo[stripe];
376         ost_idx = oinfo->loi_ost_idx;
377         rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
378         if (rc != 0)
379                 GOTO(out, result = NULL);
380
381         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
382         result = lov_sub_find(env, subdev, ofid, NULL);
383 out:
384         if (result == NULL)
385                 result = ERR_PTR(-EINVAL);
386         return result;
387 }
388
389 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
390                             union lov_layout_state *state)
391 {
392         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
393
394         lov_layout_wait(env, lov);
395         return 0;
396 }
397
398 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
399                                struct lov_layout_raid0 *r0,
400                                struct lovsub_object *los, int idx)
401 {
402         struct cl_object        *sub;
403         struct lu_site          *site;
404         struct lu_site_bkt_data *bkt;
405         wait_queue_t          *waiter;
406
407         LASSERT(r0->lo_sub[idx] == los);
408
409         sub  = lovsub2cl(los);
410         site = sub->co_lu.lo_dev->ld_site;
411         bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
412
413         cl_object_kill(env, sub);
414         /* release a reference to the sub-object and ... */
415         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
416         cl_object_put(env, sub);
417
418         /* ... wait until it is actually destroyed---sub-object clears its
419          * ->lo_sub[] slot in lovsub_object_fini() */
420         if (r0->lo_sub[idx] == los) {
421                 waiter = &lov_env_info(env)->lti_waiter;
422                 init_waitqueue_entry(waiter, current);
423                 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
424                 set_current_state(TASK_UNINTERRUPTIBLE);
425                 while (1) {
426                         /* this wait-queue is signaled at the end of
427                          * lu_object_free(). */
428                         set_current_state(TASK_UNINTERRUPTIBLE);
429                         spin_lock(&r0->lo_sub_lock);
430                         if (r0->lo_sub[idx] == los) {
431                                 spin_unlock(&r0->lo_sub_lock);
432                                 schedule();
433                         } else {
434                                 spin_unlock(&r0->lo_sub_lock);
435                                 set_current_state(TASK_RUNNING);
436                                 break;
437                         }
438                 }
439                 remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
440         }
441         LASSERT(r0->lo_sub[idx] == NULL);
442 }
443
444 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
445                              struct lov_layout_raid0 *r0)
446 {
447         ENTRY;
448
449         if (r0->lo_sub != NULL) {
450                 int i;
451
452                 for (i = 0; i < r0->lo_nr; ++i) {
453                         struct lovsub_object *los = r0->lo_sub[i];
454
455                         if (los != NULL) {
456                                 cl_object_prune(env, &los->lso_cl);
457                                 /*
458                                  * If top-level object is to be evicted from
459                                  * the cache, so are its sub-objects.
460                                  */
461                                 lov_subobject_kill(env, lov, r0, los, i);
462                         }
463                 }
464         }
465
466         EXIT;
467 }
468
469 static int lov_delete_composite(const struct lu_env *env,
470                                 struct lov_object *lov,
471                                 union lov_layout_state *state)
472 {
473         struct lov_layout_entry *entry;
474
475         ENTRY;
476
477         dump_lsm(D_INODE, lov->lo_lsm);
478
479         lov_layout_wait(env, lov);
480         lov_foreach_layout_entry(lov, entry)
481                 lov_delete_raid0(env, lov, &entry->lle_raid0);
482
483         RETURN(0);
484 }
485
486 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
487                            union lov_layout_state *state)
488 {
489         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
490 }
491
492 static void lov_fini_raid0(const struct lu_env *env,
493                            struct lov_layout_raid0 *r0)
494 {
495         if (r0->lo_sub != NULL) {
496                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
497                 r0->lo_sub = NULL;
498         }
499 }
500
501 static void lov_fini_composite(const struct lu_env *env,
502                                struct lov_object *lov,
503                                union lov_layout_state *state)
504 {
505         struct lov_layout_composite *comp = &state->composite;
506         ENTRY;
507
508         if (comp->lo_entries != NULL) {
509                 struct lov_layout_entry *entry;
510
511                 lov_foreach_layout_entry(lov, entry)
512                         lov_fini_raid0(env, &entry->lle_raid0);
513
514                 OBD_FREE(comp->lo_entries,
515                          comp->lo_entry_count * sizeof(*comp->lo_entries));
516                 comp->lo_entries = NULL;
517         }
518
519         dump_lsm(D_INODE, lov->lo_lsm);
520         lov_free_memmd(&lov->lo_lsm);
521
522         EXIT;
523 }
524
525 static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
526                                 union lov_layout_state *state)
527 {
528         ENTRY;
529         dump_lsm(D_INODE, lov->lo_lsm);
530         lov_free_memmd(&lov->lo_lsm);
531         EXIT;
532 }
533
534 static int lov_print_empty(const struct lu_env *env, void *cookie,
535                            lu_printer_t p, const struct lu_object *o)
536 {
537         (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
538         return 0;
539 }
540
541 static int lov_print_raid0(const struct lu_env *env, void *cookie,
542                            lu_printer_t p, struct lov_layout_raid0 *r0)
543 {
544         int i;
545
546         for (i = 0; i < r0->lo_nr; ++i) {
547                 struct lu_object *sub;
548
549                 if (r0->lo_sub[i] != NULL) {
550                         sub = lovsub2lu(r0->lo_sub[i]);
551                         lu_object_print(env, cookie, p, sub);
552                 } else {
553                         (*p)(env, cookie, "sub %d absent\n", i);
554                 }
555         }
556         return 0;
557 }
558
559 static int lov_print_composite(const struct lu_env *env, void *cookie,
560                                lu_printer_t p, const struct lu_object *o)
561 {
562         struct lov_object *lov = lu2lov(o);
563         struct lov_stripe_md *lsm = lov->lo_lsm;
564         int i;
565
566         (*p)(env, cookie, "entries: %d, %s, lsm{%p 0x%08X %d %u}:\n",
567              lsm->lsm_entry_count,
568              lov->lo_layout_invalid ? "invalid" : "valid", lsm,
569              lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
570              lsm->lsm_layout_gen);
571
572         for (i = 0; i < lsm->lsm_entry_count; i++) {
573                 struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
574
575                 (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %u, %u }\n",
576                      PEXT(&lse->lsme_extent), lse->lsme_magic,
577                      lse->lsme_id, lse->lsme_layout_gen,
578                      lse->lsme_stripe_count, lse->lsme_stripe_size);
579                 lov_print_raid0(env, cookie, p, lov_r0(lov, i));
580         }
581
582         return 0;
583 }
584
585 static int lov_print_released(const struct lu_env *env, void *cookie,
586                                 lu_printer_t p, const struct lu_object *o)
587 {
588         struct lov_object       *lov = lu2lov(o);
589         struct lov_stripe_md    *lsm = lov->lo_lsm;
590
591         (*p)(env, cookie,
592                 "released: %s, lsm{%p 0x%08X %d %u}:\n",
593                 lov->lo_layout_invalid ? "invalid" : "valid", lsm,
594                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
595                 lsm->lsm_layout_gen);
596         return 0;
597 }
598
599 /**
600  * Implements cl_object_operations::coo_attr_get() method for an object
601  * without stripes (LLT_EMPTY layout type).
602  *
603  * The only attributes this layer is authoritative in this case is
604  * cl_attr::cat_blocks---it's 0.
605  */
606 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
607                               struct cl_attr *attr)
608 {
609         attr->cat_blocks = 0;
610         return 0;
611 }
612
613 static int lov_attr_get_raid0(const struct lu_env *env, struct lov_object *lov,
614                               unsigned int index, struct lov_layout_raid0 *r0)
615
616 {
617         struct lov_stripe_md *lsm = lov->lo_lsm;
618         struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
619         struct cl_attr *attr = &r0->lo_attr;
620         __u64 kms = 0;
621         int result = 0;
622
623         if (r0->lo_attr_valid)
624                 return 0;
625
626         memset(lvb, 0, sizeof(*lvb));
627
628         /* XXX: timestamps can be negative by sanity:test_39m,
629          * how can it be? */
630         lvb->lvb_atime = LLONG_MIN;
631         lvb->lvb_ctime = LLONG_MIN;
632         lvb->lvb_mtime = LLONG_MIN;
633
634         /*
635          * XXX that should be replaced with a loop over sub-objects,
636          * doing cl_object_attr_get() on them. But for now, let's
637          * reuse old lov code.
638          */
639
640         /*
641          * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
642          * happy. It's not needed, because new code uses
643          * ->coh_attr_guard spin-lock to protect consistency of
644          * sub-object attributes.
645          */
646         lov_stripe_lock(lsm);
647         result = lov_merge_lvb_kms(lsm, index, lvb, &kms);
648         lov_stripe_unlock(lsm);
649         if (result == 0) {
650                 cl_lvb2attr(attr, lvb);
651                 attr->cat_kms = kms;
652                 r0->lo_attr_valid = 1;
653         }
654
655         return result;
656 }
657
658 static int lov_attr_get_composite(const struct lu_env *env,
659                                   struct cl_object *obj,
660                                   struct cl_attr *attr)
661 {
662         struct lov_object       *lov = cl2lov(obj);
663         struct lov_layout_entry *entry;
664         int                      result = 0;
665         int                      index = 0;
666
667         ENTRY;
668
669         attr->cat_size = 0;
670         attr->cat_blocks = 0;
671         lov_foreach_layout_entry(lov, entry) {
672                 struct lov_layout_raid0 *r0 = &entry->lle_raid0;
673                 struct cl_attr *lov_attr = &r0->lo_attr;
674
675                 result = lov_attr_get_raid0(env, lov, index, r0);
676                 if (result != 0)
677                         break;
678
679                 index++;
680
681                 /* merge results */
682                 attr->cat_blocks += lov_attr->cat_blocks;
683                 if (attr->cat_size < lov_attr->cat_size)
684                         attr->cat_size = lov_attr->cat_size;
685                 if (attr->cat_kms < lov_attr->cat_kms)
686                         attr->cat_kms = lov_attr->cat_kms;
687                 if (attr->cat_atime < lov_attr->cat_atime)
688                         attr->cat_atime = lov_attr->cat_atime;
689                 if (attr->cat_ctime < lov_attr->cat_ctime)
690                         attr->cat_ctime = lov_attr->cat_ctime;
691                 if (attr->cat_mtime < lov_attr->cat_mtime)
692                         attr->cat_mtime = lov_attr->cat_mtime;
693         }
694         RETURN(result);
695 }
696
697 const static struct lov_layout_operations lov_dispatch[] = {
698         [LLT_EMPTY] = {
699                 .llo_init      = lov_init_empty,
700                 .llo_delete    = lov_delete_empty,
701                 .llo_fini      = lov_fini_empty,
702                 .llo_install   = lov_install_empty,
703                 .llo_print     = lov_print_empty,
704                 .llo_page_init = lov_page_init_empty,
705                 .llo_lock_init = lov_lock_init_empty,
706                 .llo_io_init   = lov_io_init_empty,
707                 .llo_getattr   = lov_attr_get_empty,
708         },
709         [LLT_RELEASED] = {
710                 .llo_init      = lov_init_released,
711                 .llo_delete    = lov_delete_empty,
712                 .llo_fini      = lov_fini_released,
713                 .llo_install   = lov_install_empty,
714                 .llo_print     = lov_print_released,
715                 .llo_page_init = lov_page_init_empty,
716                 .llo_lock_init = lov_lock_init_empty,
717                 .llo_io_init   = lov_io_init_released,
718                 .llo_getattr   = lov_attr_get_empty,
719         },
720         [LLT_COMP] = {
721                 .llo_init      = lov_init_composite,
722                 .llo_delete    = lov_delete_composite,
723                 .llo_fini      = lov_fini_composite,
724                 .llo_install   = lov_install_composite,
725                 .llo_print     = lov_print_composite,
726                 .llo_page_init = lov_page_init_composite,
727                 .llo_lock_init = lov_lock_init_composite,
728                 .llo_io_init   = lov_io_init_composite,
729                 .llo_getattr   = lov_attr_get_composite,
730         },
731 };
732
733 /**
734  * Performs a double-dispatch based on the layout type of an object.
735  */
736 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)              \
737 ({                                                      \
738         struct lov_object *__obj = (obj);               \
739         enum lov_layout_type __llt;                     \
740                                                         \
741         __llt = __obj->lo_type;                         \
742         LASSERT(__llt < ARRAY_SIZE(lov_dispatch));      \
743         lov_dispatch[__llt].op(__VA_ARGS__);            \
744 })
745
746 /**
747  * Return lov_layout_type associated with a given lsm
748  */
749 static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
750 {
751         if (lsm == NULL)
752                 return LLT_EMPTY;
753
754         if (lsm->lsm_is_released)
755                 return LLT_RELEASED;
756
757         if (lsm->lsm_magic == LOV_MAGIC_V1 ||
758             lsm->lsm_magic == LOV_MAGIC_V3 ||
759             lsm->lsm_magic == LOV_MAGIC_COMP_V1)
760                 return LLT_COMP;
761
762         return LLT_EMPTY;
763 }
764
765 static inline void lov_conf_freeze(struct lov_object *lov)
766 {
767         CDEBUG(D_INODE, "To take share lov(%p) owner %p/%p\n",
768                 lov, lov->lo_owner, current);
769         if (lov->lo_owner != current)
770                 down_read(&lov->lo_type_guard);
771 }
772
773 static inline void lov_conf_thaw(struct lov_object *lov)
774 {
775         CDEBUG(D_INODE, "To release share lov(%p) owner %p/%p\n",
776                 lov, lov->lo_owner, current);
777         if (lov->lo_owner != current)
778                 up_read(&lov->lo_type_guard);
779 }
780
781 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
782 ({                                                                      \
783         struct lov_object                      *__obj = (obj);          \
784         int                                     __lock = !!(lock);      \
785         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
786                                                                         \
787         if (__lock)                                                     \
788                 lov_conf_freeze(__obj);                                 \
789         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
790         if (__lock)                                                     \
791                 lov_conf_thaw(__obj);                                   \
792         __result;                                                       \
793 })
794
795 /**
796  * Performs a locked double-dispatch based on the layout type of an object.
797  */
798 #define LOV_2DISPATCH(obj, op, ...)                     \
799         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
800
801 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
802 do {                                                                    \
803         struct lov_object                      *__obj = (obj);          \
804         enum lov_layout_type                    __llt;                  \
805                                                                         \
806         lov_conf_freeze(__obj);                                         \
807         __llt = __obj->lo_type;                                         \
808         LASSERT(__llt < ARRAY_SIZE(lov_dispatch));                      \
809         lov_dispatch[__llt].op(__VA_ARGS__);                            \
810         lov_conf_thaw(__obj);                                           \
811 } while (0)
812
813 static void lov_conf_lock(struct lov_object *lov)
814 {
815         LASSERT(lov->lo_owner != current);
816         down_write(&lov->lo_type_guard);
817         LASSERT(lov->lo_owner == NULL);
818         lov->lo_owner = current;
819         CDEBUG(D_INODE, "Took exclusive lov(%p) owner %p\n",
820                 lov, lov->lo_owner);
821 }
822
823 static void lov_conf_unlock(struct lov_object *lov)
824 {
825         CDEBUG(D_INODE, "To release exclusive lov(%p) owner %p\n",
826                 lov, lov->lo_owner);
827         lov->lo_owner = NULL;
828         up_write(&lov->lo_type_guard);
829 }
830
831 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
832 {
833         struct l_wait_info lwi = { 0 };
834         ENTRY;
835
836         while (atomic_read(&lov->lo_active_ios) > 0) {
837                 CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
838                         PFID(lu_object_fid(lov2lu(lov))),
839                         atomic_read(&lov->lo_active_ios));
840
841                 l_wait_event(lov->lo_waitq,
842                              atomic_read(&lov->lo_active_ios) == 0, &lwi);
843         }
844         RETURN(0);
845 }
846
847 static int lov_layout_change(const struct lu_env *unused,
848                              struct lov_object *lov, struct lov_stripe_md *lsm,
849                              const struct cl_object_conf *conf)
850 {
851         enum lov_layout_type llt = lov_type(lsm);
852         union lov_layout_state *state = &lov->u;
853         const struct lov_layout_operations *old_ops;
854         const struct lov_layout_operations *new_ops;
855         struct lov_device *lov_dev = lov_object_dev(lov);
856         struct lu_env *env;
857         __u16 refcheck;
858         int rc;
859         ENTRY;
860
861         LASSERT(lov->lo_type < ARRAY_SIZE(lov_dispatch));
862
863         env = cl_env_get(&refcheck);
864         if (IS_ERR(env))
865                 RETURN(PTR_ERR(env));
866
867         LASSERT(llt < ARRAY_SIZE(lov_dispatch));
868
869         CDEBUG(D_INODE, DFID" from %s to %s\n",
870                PFID(lu_object_fid(lov2lu(lov))),
871                llt2str(lov->lo_type), llt2str(llt));
872
873         old_ops = &lov_dispatch[lov->lo_type];
874         new_ops = &lov_dispatch[llt];
875
876         rc = cl_object_prune(env, &lov->lo_cl);
877         if (rc != 0)
878                 GOTO(out, rc);
879
880         rc = old_ops->llo_delete(env, lov, &lov->u);
881         if (rc != 0)
882                 GOTO(out, rc);
883
884         old_ops->llo_fini(env, lov, &lov->u);
885
886         LASSERT(atomic_read(&lov->lo_active_ios) == 0);
887
888         CDEBUG(D_INODE, DFID "Apply new layout lov %p, type %d\n",
889                PFID(lu_object_fid(lov2lu(lov))), lov, llt);
890
891         lov->lo_type = LLT_EMPTY;
892
893         /* page bufsize fixup */
894         cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
895                 lov_page_slice_fixup(lov, NULL);
896
897         rc = new_ops->llo_init(env, lov_dev, lov, lsm, conf, state);
898         if (rc != 0) {
899                 struct obd_device *obd = lov2obd(lov_dev->ld_lov);
900
901                 CERROR("%s: cannot apply new layout on "DFID" : rc = %d\n",
902                        obd->obd_name, PFID(lu_object_fid(lov2lu(lov))), rc);
903                 new_ops->llo_delete(env, lov, state);
904                 new_ops->llo_fini(env, lov, state);
905                 /* this file becomes an EMPTY file. */
906                 GOTO(out, rc);
907         }
908
909         new_ops->llo_install(env, lov, state);
910         lov->lo_type = llt;
911
912 out:
913         cl_env_put(env, &refcheck);
914         RETURN(rc);
915 }
916
917 /*****************************************************************************
918  *
919  * Lov object operations.
920  *
921  */
922 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
923                     const struct lu_object_conf *conf)
924 {
925         struct lov_object            *lov   = lu2lov(obj);
926         struct lov_device            *dev   = lov_object_dev(lov);
927         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
928         union lov_layout_state       *set   = &lov->u;
929         const struct lov_layout_operations *ops;
930         struct lov_stripe_md *lsm = NULL;
931         int rc;
932         ENTRY;
933
934         init_rwsem(&lov->lo_type_guard);
935         atomic_set(&lov->lo_active_ios, 0);
936         init_waitqueue_head(&lov->lo_waitq);
937         cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
938
939         lov->lo_type = LLT_EMPTY;
940         if (cconf->u.coc_layout.lb_buf != NULL) {
941                 lsm = lov_unpackmd(dev->ld_lov,
942                                    cconf->u.coc_layout.lb_buf,
943                                    cconf->u.coc_layout.lb_len);
944                 if (IS_ERR(lsm))
945                         RETURN(PTR_ERR(lsm));
946
947                 dump_lsm(D_INODE, lsm);
948         }
949
950         /* no locking is necessary, as object is being created */
951         lov->lo_type = lov_type(lsm);
952         ops = &lov_dispatch[lov->lo_type];
953         rc = ops->llo_init(env, dev, lov, lsm, cconf, set);
954         if (rc != 0)
955                 GOTO(out_lsm, rc);
956
957         ops->llo_install(env, lov, set);
958
959 out_lsm:
960         lov_lsm_put(lsm);
961
962         RETURN(rc);
963 }
964
965 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
966                         const struct cl_object_conf *conf)
967 {
968         struct lov_stripe_md    *lsm = NULL;
969         struct lov_object       *lov = cl2lov(obj);
970         int                      result = 0;
971         ENTRY;
972
973         if (conf->coc_opc == OBJECT_CONF_SET &&
974             conf->u.coc_layout.lb_buf != NULL) {
975                 lsm = lov_unpackmd(lov_object_dev(lov)->ld_lov,
976                                    conf->u.coc_layout.lb_buf,
977                                    conf->u.coc_layout.lb_len);
978                 if (IS_ERR(lsm))
979                         RETURN(PTR_ERR(lsm));
980         }
981
982         lov_conf_lock(lov);
983         if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
984                 lov->lo_layout_invalid = true;
985                 GOTO(out, result = 0);
986         }
987
988         if (conf->coc_opc == OBJECT_CONF_WAIT) {
989                 if (lov->lo_layout_invalid &&
990                     atomic_read(&lov->lo_active_ios) > 0) {
991                         lov_conf_unlock(lov);
992                         result = lov_layout_wait(env, lov);
993                         lov_conf_lock(lov);
994                 }
995                 GOTO(out, result);
996         }
997
998         LASSERT(conf->coc_opc == OBJECT_CONF_SET);
999
1000         if ((lsm == NULL && lov->lo_lsm == NULL) ||
1001             ((lsm != NULL && lov->lo_lsm != NULL) &&
1002              (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
1003              (lov->lo_lsm->lsm_entries[0]->lsme_pattern ==
1004               lsm->lsm_entries[0]->lsme_pattern))) {
1005                 /* same version of layout */
1006                 lov->lo_layout_invalid = false;
1007                 GOTO(out, result = 0);
1008         }
1009
1010         /* will change layout - check if there still exists active IO. */
1011         if (atomic_read(&lov->lo_active_ios) > 0) {
1012                 lov->lo_layout_invalid = true;
1013                 GOTO(out, result = -EBUSY);
1014         }
1015
1016         result = lov_layout_change(env, lov, lsm, conf);
1017         lov->lo_layout_invalid = result != 0;
1018         EXIT;
1019
1020 out:
1021         lov_conf_unlock(lov);
1022         lov_lsm_put(lsm);
1023         CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
1024                PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
1025         RETURN(result);
1026 }
1027
1028 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
1029 {
1030         struct lov_object *lov = lu2lov(obj);
1031
1032         ENTRY;
1033         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
1034         EXIT;
1035 }
1036
1037 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
1038 {
1039         struct lov_object *lov = lu2lov(obj);
1040
1041         ENTRY;
1042         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
1043         lu_object_fini(obj);
1044         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
1045         EXIT;
1046 }
1047
1048 static int lov_object_print(const struct lu_env *env, void *cookie,
1049                             lu_printer_t p, const struct lu_object *o)
1050 {
1051         return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
1052 }
1053
1054 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
1055                   struct cl_page *page, pgoff_t index)
1056 {
1057         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
1058                                     index);
1059 }
1060
1061 /**
1062  * Implements cl_object_operations::clo_io_init() method for lov
1063  * layer. Dispatches to the appropriate layout io initialization method.
1064  */
1065 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
1066                 struct cl_io *io)
1067 {
1068         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
1069
1070         CDEBUG(D_INODE, DFID "io %p type %d ignore/verify layout %d/%d\n",
1071                PFID(lu_object_fid(&obj->co_lu)), io, io->ci_type,
1072                io->ci_ignore_layout, io->ci_verify_layout);
1073
1074         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
1075                                      !io->ci_ignore_layout, env, obj, io);
1076 }
1077
1078 /**
1079  * An implementation of cl_object_operations::clo_attr_get() method for lov
1080  * layer. For raid0 layout this collects and merges attributes of all
1081  * sub-objects.
1082  */
1083 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
1084                         struct cl_attr *attr)
1085 {
1086         /* do not take lock, as this function is called under a
1087          * spin-lock. Layout is protected from changing by ongoing IO. */
1088         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
1089 }
1090
1091 static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
1092                            const struct cl_attr *attr, unsigned valid)
1093 {
1094         /*
1095          * No dispatch is required here, as no layout implements this.
1096          */
1097         return 0;
1098 }
1099
1100 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
1101                   struct cl_lock *lock, const struct cl_io *io)
1102 {
1103         /* No need to lock because we've taken one refcount of layout.  */
1104         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock,
1105                                     io);
1106 }
1107
1108 /**
1109  * We calculate on which OST the mapping will end. If the length of mapping
1110  * is greater than (stripe_size * stripe_count) then the last_stripe will
1111  * will be one just before start_stripe. Else we check if the mapping
1112  * intersects each OST and find last_stripe.
1113  * This function returns the last_stripe and also sets the stripe_count
1114  * over which the mapping is spread
1115  *
1116  * \param lsm [in]              striping information for the file
1117  * \param index [in]            stripe component index
1118  * \param ext [in]              logical extent of mapping
1119  * \param start_stripe [in]     starting stripe of the mapping
1120  * \param stripe_count [out]    the number of stripes across which to map is
1121  *                              returned
1122  *
1123  * \retval last_stripe          return the last stripe of the mapping
1124  */
1125 static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, int index,
1126                                    struct lu_extent *ext,
1127                                    int start_stripe, int *stripe_count)
1128 {
1129         struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
1130         int last_stripe;
1131         u64 obd_start;
1132         u64 obd_end;
1133         int i, j;
1134
1135         if (ext->e_end - ext->e_start >
1136             lsme->lsme_stripe_size * lsme->lsme_stripe_count) {
1137                 last_stripe = (start_stripe < 1 ? lsme->lsme_stripe_count - 1 :
1138                                                   start_stripe - 1);
1139                 *stripe_count = lsme->lsme_stripe_count;
1140         } else {
1141                 for (j = 0, i = start_stripe; j < lsme->lsme_stripe_count;
1142                      i = (i + 1) % lsme->lsme_stripe_count, j++) {
1143                         if ((lov_stripe_intersects(lsm, index,  i, ext,
1144                                                    &obd_start, &obd_end)) == 0)
1145                                 break;
1146                 }
1147                 *stripe_count = j;
1148                 last_stripe = (start_stripe + j - 1) % lsme->lsme_stripe_count;
1149         }
1150
1151         return last_stripe;
1152 }
1153
1154 /**
1155  * Set fe_device and copy extents from local buffer into main return buffer.
1156  *
1157  * \param fiemap [out]          fiemap to hold all extents
1158  * \param lcl_fm_ext [in]       array of fiemap extents get from OSC layer
1159  * \param ost_index [in]        OST index to be written into the fm_device
1160  *                              field for each extent
1161  * \param ext_count [in]        number of extents to be copied
1162  * \param current_extent [in]   where to start copying in the extent array
1163  */
1164 static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
1165                                          struct fiemap_extent *lcl_fm_ext,
1166                                          int ost_index, unsigned int ext_count,
1167                                          int current_extent)
1168 {
1169         char            *to;
1170         unsigned int    ext;
1171
1172         for (ext = 0; ext < ext_count; ext++) {
1173                 lcl_fm_ext[ext].fe_device = ost_index;
1174                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1175         }
1176
1177         /* Copy fm_extent's from fm_local to return buffer */
1178         to = (char *)fiemap + fiemap_count_to_size(current_extent);
1179         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct fiemap_extent));
1180 }
1181
1182 #define FIEMAP_BUFFER_SIZE 4096
1183
1184 /**
1185  * Non-zero fe_logical indicates that this is a continuation FIEMAP
1186  * call. The local end offset and the device are sent in the first
1187  * fm_extent. This function calculates the stripe number from the index.
1188  * This function returns a stripe_no on which mapping is to be restarted.
1189  *
1190  * This function returns fm_end_offset which is the in-OST offset at which
1191  * mapping should be restarted. If fm_end_offset=0 is returned then caller
1192  * will re-calculate proper offset in next stripe.
1193  * Note that the first extent is passed to lov_get_info via the value field.
1194  *
1195  * \param fiemap [in]           fiemap request header
1196  * \param lsm [in]              striping information for the file
1197  * \param index [in]            stripe component index
1198  * \param ext [in]              logical extent of mapping
1199  * \param start_stripe [out]    starting stripe will be returned in this
1200  */
1201 static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap,
1202                                      struct lov_stripe_md *lsm,
1203                                      int index, struct lu_extent *ext,
1204                                      int *start_stripe)
1205 {
1206         struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
1207         u64 local_end = fiemap->fm_extents[0].fe_logical;
1208         u64 lun_start;
1209         u64 lun_end;
1210         u64 fm_end_offset;
1211         int stripe_no = -1;
1212         int i;
1213
1214         if (fiemap->fm_extent_count == 0 ||
1215             fiemap->fm_extents[0].fe_logical == 0)
1216                 return 0;
1217
1218         /* Find out stripe_no from ost_index saved in the fe_device */
1219         for (i = 0; i < lsme->lsme_stripe_count; i++) {
1220                 struct lov_oinfo *oinfo = lsme->lsme_oinfo[i];
1221
1222                 if (lov_oinfo_is_dummy(oinfo))
1223                         continue;
1224
1225                 if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1226                         stripe_no = i;
1227                         break;
1228                 }
1229         }
1230
1231         if (stripe_no == -1)
1232                 return -EINVAL;
1233
1234         /* If we have finished mapping on previous device, shift logical
1235          * offset to start of next device */
1236         if (lov_stripe_intersects(lsm, index, stripe_no, ext,
1237                                    &lun_start, &lun_end) != 0 &&
1238             local_end < lun_end) {
1239                 fm_end_offset = local_end;
1240                 *start_stripe = stripe_no;
1241         } else {
1242                 /* This is a special value to indicate that caller should
1243                  * calculate offset in next stripe. */
1244                 fm_end_offset = 0;
1245                 *start_stripe = (stripe_no + 1) % lsme->lsme_stripe_count;
1246         }
1247
1248         return fm_end_offset;
1249 }
1250
1251 struct fiemap_state {
1252         struct fiemap           *fs_fm;
1253         struct lu_extent        fs_ext;
1254         u64                     fs_length;
1255         u64                     fs_end_offset;
1256         int                     fs_cur_extent;
1257         int                     fs_cnt_need;
1258         int                     fs_start_stripe;
1259         int                     fs_last_stripe;
1260         bool                    fs_device_done;
1261         bool                    fs_finish_stripe;
1262         bool                    fs_enough;
1263 };
1264
1265 int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
1266                       struct lov_stripe_md *lsm, struct fiemap *fiemap,
1267                       size_t *buflen, struct ll_fiemap_info_key *fmkey,
1268                       int index, int stripeno, struct fiemap_state *fs)
1269 {
1270         struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
1271         struct cl_object *subobj;
1272         struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
1273         struct fiemap_extent *fm_ext = &fs->fs_fm->fm_extents[0];
1274         u64 req_fm_len; /* Stores length of required mapping */
1275         u64 len_mapped_single_call;
1276         u64 lun_start;
1277         u64 lun_end;
1278         u64 obd_object_end;
1279         unsigned int ext_count;
1280         /* EOF for object */
1281         bool ost_eof = false;
1282         /* done with required mapping for this OST? */
1283         bool ost_done = false;
1284         int ost_index;
1285         int rc = 0;
1286
1287         fs->fs_device_done = false;
1288         /* Find out range of mapping on this stripe */
1289         if ((lov_stripe_intersects(lsm, index, stripeno, &fs->fs_ext,
1290                                    &lun_start, &obd_object_end)) == 0)
1291                 return 0;
1292
1293         if (lov_oinfo_is_dummy(lsme->lsme_oinfo[stripeno]))
1294                 return -EIO;
1295
1296         /* If this is a continuation FIEMAP call and we are on
1297          * starting stripe then lun_start needs to be set to
1298          * end_offset */
1299         if (fs->fs_end_offset != 0 && stripeno == fs->fs_start_stripe)
1300                 lun_start = fs->fs_end_offset;
1301         lun_end = lov_size_to_stripe(lsm, index, fs->fs_ext.e_end, stripeno);
1302         if (lun_start == lun_end)
1303                 return 0;
1304
1305         req_fm_len = obd_object_end - lun_start;
1306         fs->fs_fm->fm_length = 0;
1307         len_mapped_single_call = 0;
1308
1309         /* find lobsub object */
1310         subobj = lov_find_subobj(env, cl2lov(obj), lsm,
1311                                  lov_comp_index(index, stripeno));
1312         if (IS_ERR(subobj))
1313                 return PTR_ERR(subobj);
1314         /* If the output buffer is very large and the objects have many
1315          * extents we may need to loop on a single OST repeatedly */
1316         do {
1317                 if (fiemap->fm_extent_count > 0) {
1318                         /* Don't get too many extents. */
1319                         if (fs->fs_cur_extent + fs->fs_cnt_need >
1320                             fiemap->fm_extent_count)
1321                                 fs->fs_cnt_need = fiemap->fm_extent_count -
1322                                                   fs->fs_cur_extent;
1323                 }
1324
1325                 lun_start += len_mapped_single_call;
1326                 fs->fs_fm->fm_length = req_fm_len - len_mapped_single_call;
1327                 req_fm_len = fs->fs_fm->fm_length;
1328                 /**
1329                  * If we've collected enough extent map, we'd request 1 more,
1330                  * to see whether we coincidentally finished all available
1331                  * extent map, so that FIEMAP_EXTENT_LAST would be set.
1332                  */
1333                 fs->fs_fm->fm_extent_count = fs->fs_enough ?
1334                                              1 : fs->fs_cnt_need;
1335                 fs->fs_fm->fm_mapped_extents = 0;
1336                 fs->fs_fm->fm_flags = fiemap->fm_flags;
1337
1338                 ost_index = lsme->lsme_oinfo[stripeno]->loi_ost_idx;
1339
1340                 if (ost_index < 0 || ost_index >= lov->desc.ld_tgt_count)
1341                         GOTO(obj_put, rc = -EINVAL);
1342                 /* If OST is inactive, return extent with UNKNOWN flag. */
1343                 if (!lov->lov_tgts[ost_index]->ltd_active) {
1344                         fs->fs_fm->fm_flags |= FIEMAP_EXTENT_LAST;
1345                         fs->fs_fm->fm_mapped_extents = 1;
1346
1347                         fm_ext[0].fe_logical = lun_start;
1348                         fm_ext[0].fe_length = obd_object_end - lun_start;
1349                         fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1350
1351                         goto inactive_tgt;
1352                 }
1353
1354                 fs->fs_fm->fm_start = lun_start;
1355                 fs->fs_fm->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1356                 memcpy(&fmkey->lfik_fiemap, fs->fs_fm, sizeof(*fs->fs_fm));
1357                 *buflen = fiemap_count_to_size(fs->fs_fm->fm_extent_count);
1358
1359                 rc = cl_object_fiemap(env, subobj, fmkey, fs->fs_fm, buflen);
1360                 if (rc != 0)
1361                         GOTO(obj_put, rc);
1362 inactive_tgt:
1363                 ext_count = fs->fs_fm->fm_mapped_extents;
1364                 if (ext_count == 0) {
1365                         ost_done = true;
1366                         fs->fs_device_done = true;
1367                         /* If last stripe has hold at the end,
1368                          * we need to return */
1369                         if (stripeno == fs->fs_last_stripe) {
1370                                 fiemap->fm_mapped_extents = 0;
1371                                 fs->fs_finish_stripe = true;
1372                                 GOTO(obj_put, rc);
1373                         }
1374                         break;
1375                 } else if (fs->fs_enough) {
1376                         /*
1377                          * We've collected enough extents and there are
1378                          * more extents after it.
1379                          */
1380                         GOTO(obj_put, rc);
1381                 }
1382
1383                 /* If we just need num of extents, got to next device */
1384                 if (fiemap->fm_extent_count == 0) {
1385                         fs->fs_cur_extent += ext_count;
1386                         break;
1387                 }
1388
1389                 /* prepare to copy retrived map extents */
1390                 len_mapped_single_call = fm_ext[ext_count - 1].fe_logical +
1391                                          fm_ext[ext_count - 1].fe_length -
1392                                          lun_start;
1393
1394                 /* Have we finished mapping on this device? */
1395                 if (req_fm_len <= len_mapped_single_call) {
1396                         ost_done = true;
1397                         fs->fs_device_done = true;
1398                 }
1399
1400                 /* Clear the EXTENT_LAST flag which can be present on
1401                  * the last extent */
1402                 if (fm_ext[ext_count - 1].fe_flags & FIEMAP_EXTENT_LAST)
1403                         fm_ext[ext_count - 1].fe_flags &= ~FIEMAP_EXTENT_LAST;
1404                 if (lov_stripe_size(lsm, index,
1405                                     fm_ext[ext_count - 1].fe_logical +
1406                                     fm_ext[ext_count - 1].fe_length,
1407                                     stripeno) >= fmkey->lfik_oa.o_size) {
1408                         ost_eof = true;
1409                         fs->fs_device_done = true;
1410                 }
1411
1412                 fiemap_prepare_and_copy_exts(fiemap, fm_ext, ost_index,
1413                                              ext_count, fs->fs_cur_extent);
1414                 fs->fs_cur_extent += ext_count;
1415
1416                 /* Ran out of available extents? */
1417                 if (fs->fs_cur_extent >= fiemap->fm_extent_count)
1418                         fs->fs_enough = true;
1419         } while (!ost_done && !ost_eof);
1420
1421         if (stripeno == fs->fs_last_stripe)
1422                 fs->fs_finish_stripe = true;
1423 obj_put:
1424         cl_object_put(env, subobj);
1425
1426         return rc;
1427 }
1428
1429 /**
1430  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1431  * This also handles the restarting of FIEMAP calls in case mapping overflows
1432  * the available number of extents in single call.
1433  *
1434  * \param env [in]              lustre environment
1435  * \param obj [in]              file object
1436  * \param fmkey [in]            fiemap request header and other info
1437  * \param fiemap [out]          fiemap buffer holding retrived map extents
1438  * \param buflen [in/out]       max buffer length of @fiemap, when iterate
1439  *                              each OST, it is used to limit max map needed
1440  * \retval 0    success
1441  * \retval < 0  error
1442  */
1443 static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
1444                              struct ll_fiemap_info_key *fmkey,
1445                              struct fiemap *fiemap, size_t *buflen)
1446 {
1447         struct lov_stripe_md_entry *lsme;
1448         struct lov_stripe_md *lsm;
1449         struct fiemap *fm_local = NULL;
1450         loff_t whole_start;
1451         loff_t whole_end;
1452         int entry;
1453         int start_entry;
1454         int end_entry;
1455         int cur_stripe = 0;
1456         int stripe_count;
1457         unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
1458         int rc = 0;
1459         struct fiemap_state fs = { 0 };
1460         ENTRY;
1461
1462         lsm = lov_lsm_addref(cl2lov(obj));
1463         if (lsm == NULL)
1464                 RETURN(-ENODATA);
1465
1466         if (!(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) {
1467                 /**
1468                  * If the entry count > 1 or stripe_count > 1 and the
1469                  * application does not understand DEVICE_ORDER flag,
1470                  * it cannot interpret the extents correctly.
1471                  */
1472                 if (lsm->lsm_entry_count > 1 ||
1473                     (lsm->lsm_entry_count == 1 &&
1474                      lsm->lsm_entries[0]->lsme_stripe_count > 1))
1475                         GOTO(out_lsm, rc = -ENOTSUPP);
1476         }
1477
1478         if (lsm->lsm_is_released) {
1479                 if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
1480                         /**
1481                          * released file, return a minimal FIEMAP if
1482                          * request fits in file-size.
1483                          */
1484                         fiemap->fm_mapped_extents = 1;
1485                         fiemap->fm_extents[0].fe_logical = fiemap->fm_start;
1486                         if (fiemap->fm_start + fiemap->fm_length <
1487                             fmkey->lfik_oa.o_size)
1488                                 fiemap->fm_extents[0].fe_length =
1489                                         fiemap->fm_length;
1490                         else
1491                                 fiemap->fm_extents[0].fe_length =
1492                                         fmkey->lfik_oa.o_size -
1493                                         fiemap->fm_start;
1494                         fiemap->fm_extents[0].fe_flags |=
1495                                 FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST;
1496                 }
1497                 GOTO(out_lsm, rc = 0);
1498         }
1499
1500         /* buffer_size is small to hold fm_extent_count of extents. */
1501         if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
1502                 buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
1503
1504         OBD_ALLOC_LARGE(fm_local, buffer_size);
1505         if (fm_local == NULL)
1506                 GOTO(out_lsm, rc = -ENOMEM);
1507
1508         /**
1509          * Requested extent count exceeds the fiemap buffer size, shrink our
1510          * ambition.
1511          */
1512         if (fiemap_count_to_size(fiemap->fm_extent_count) > *buflen)
1513                 fiemap->fm_extent_count = fiemap_size_to_count(*buflen);
1514         if (fiemap->fm_extent_count == 0)
1515                 fs.fs_cnt_need = 0;
1516
1517         fs.fs_enough = false;
1518         fs.fs_cur_extent = 0;
1519         fs.fs_fm = fm_local;
1520         fs.fs_cnt_need = fiemap_size_to_count(buffer_size);
1521
1522         whole_start = fiemap->fm_start;
1523         /* whole_start is beyond the end of the file */
1524         if (whole_start > fmkey->lfik_oa.o_size)
1525                 GOTO(out_fm_local, rc = -EINVAL);
1526         whole_end = (fiemap->fm_length == OBD_OBJECT_EOF) ?
1527                                         fmkey->lfik_oa.o_size :
1528                                         whole_start + fiemap->fm_length - 1;
1529         /**
1530          * If fiemap->fm_length != OBD_OBJECT_EOF but whole_end exceeds file
1531          * size
1532          */
1533         if (whole_end > fmkey->lfik_oa.o_size)
1534                 whole_end = fmkey->lfik_oa.o_size;
1535
1536         start_entry = lov_lsm_entry(lsm, whole_start);
1537         end_entry = lov_lsm_entry(lsm, whole_end);
1538         if (end_entry == -1)
1539                 end_entry = lsm->lsm_entry_count - 1;
1540
1541         if (start_entry == -1 || end_entry == -1)
1542                 GOTO(out_fm_local, rc = -EINVAL);
1543
1544         for (entry = start_entry; entry <= end_entry; entry++) {
1545                 lsme = lsm->lsm_entries[entry];
1546
1547                 if (entry == start_entry)
1548                         fs.fs_ext.e_start = whole_start;
1549                 else
1550                         fs.fs_ext.e_start = lsme->lsme_extent.e_start;
1551                 if (entry == end_entry)
1552                         fs.fs_ext.e_end = whole_end;
1553                 else
1554                         fs.fs_ext.e_end = lsme->lsme_extent.e_end - 1;
1555                 fs.fs_length = fs.fs_ext.e_end - fs.fs_ext.e_start + 1;
1556
1557                 /* Calculate start stripe, last stripe and length of mapping */
1558                 fs.fs_start_stripe = lov_stripe_number(lsm, entry,
1559                                                        fs.fs_ext.e_start);
1560                 fs.fs_last_stripe = fiemap_calc_last_stripe(lsm, entry,
1561                                         &fs.fs_ext, fs.fs_start_stripe,
1562                                         &stripe_count);
1563                 fs.fs_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, entry,
1564                                         &fs.fs_ext, &fs.fs_start_stripe);
1565                 /* Check each stripe */
1566                 for (cur_stripe = fs.fs_start_stripe; stripe_count > 0;
1567                      --stripe_count,
1568                      cur_stripe = (cur_stripe + 1) % lsme->lsme_stripe_count) {
1569                         rc = fiemap_for_stripe(env, obj, lsm, fiemap, buflen,
1570                                                fmkey, entry, cur_stripe, &fs);
1571                         if (rc < 0)
1572                                 GOTO(out_fm_local, rc);
1573                         if (fs.fs_enough)
1574                                 GOTO(finish, rc);
1575                         if (fs.fs_finish_stripe)
1576                                 break;
1577                 } /* for each stripe */
1578         } /* for covering layout component */
1579         /*
1580          * We've traversed all components, set @entry to the last component
1581          * entry, it's for the last stripe check.
1582          */
1583         entry--;
1584 finish:
1585         /* Indicate that we are returning device offsets unless file just has
1586          * single stripe */
1587         if (lsm->lsm_entry_count > 1 ||
1588             (lsm->lsm_entry_count == 1 &&
1589              lsm->lsm_entries[0]->lsme_stripe_count > 1))
1590                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1591
1592         if (fiemap->fm_extent_count == 0)
1593                 goto skip_last_device_calc;
1594
1595         /* Check if we have reached the last stripe and whether mapping for that
1596          * stripe is done. */
1597         if ((cur_stripe == fs.fs_last_stripe) && fs.fs_device_done)
1598                 fiemap->fm_extents[fs.fs_cur_extent - 1].fe_flags |=
1599                                                              FIEMAP_EXTENT_LAST;
1600 skip_last_device_calc:
1601         fiemap->fm_mapped_extents = fs.fs_cur_extent;
1602 out_fm_local:
1603         OBD_FREE_LARGE(fm_local, buffer_size);
1604
1605 out_lsm:
1606         lov_lsm_put(lsm);
1607         return rc;
1608 }
1609
1610 static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
1611                                 struct lov_user_md __user *lum)
1612 {
1613         struct lov_object       *lov = cl2lov(obj);
1614         struct lov_stripe_md    *lsm;
1615         int                     rc = 0;
1616         ENTRY;
1617
1618         lsm = lov_lsm_addref(lov);
1619         if (lsm == NULL)
1620                 RETURN(-ENODATA);
1621
1622         rc = lov_getstripe(cl2lov(obj), lsm, lum);
1623         lov_lsm_put(lsm);
1624         RETURN(rc);
1625 }
1626
1627 static int lov_object_layout_get(const struct lu_env *env,
1628                                  struct cl_object *obj,
1629                                  struct cl_layout *cl)
1630 {
1631         struct lov_object *lov = cl2lov(obj);
1632         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1633         struct lu_buf *buf = &cl->cl_buf;
1634         ssize_t rc;
1635         ENTRY;
1636
1637         if (lsm == NULL) {
1638                 cl->cl_size = 0;
1639                 cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
1640
1641                 RETURN(0);
1642         }
1643
1644         cl->cl_size = lov_comp_md_size(lsm);
1645         cl->cl_layout_gen = lsm->lsm_layout_gen;
1646
1647         rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
1648         lov_lsm_put(lsm);
1649
1650         RETURN(rc < 0 ? rc : 0);
1651 }
1652
1653 static loff_t lov_object_maxbytes(struct cl_object *obj)
1654 {
1655         struct lov_object *lov = cl2lov(obj);
1656         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1657         loff_t maxbytes;
1658
1659         if (lsm == NULL)
1660                 return LLONG_MAX;
1661
1662         maxbytes = lsm->lsm_maxbytes;
1663
1664         lov_lsm_put(lsm);
1665
1666         return maxbytes;
1667 }
1668
1669 static const struct cl_object_operations lov_ops = {
1670         .coo_page_init    = lov_page_init,
1671         .coo_lock_init    = lov_lock_init,
1672         .coo_io_init      = lov_io_init,
1673         .coo_attr_get     = lov_attr_get,
1674         .coo_attr_update  = lov_attr_update,
1675         .coo_conf_set     = lov_conf_set,
1676         .coo_getstripe    = lov_object_getstripe,
1677         .coo_layout_get   = lov_object_layout_get,
1678         .coo_maxbytes     = lov_object_maxbytes,
1679         .coo_fiemap       = lov_object_fiemap,
1680 };
1681
1682 static const struct lu_object_operations lov_lu_obj_ops = {
1683         .loo_object_init      = lov_object_init,
1684         .loo_object_delete    = lov_object_delete,
1685         .loo_object_release   = NULL,
1686         .loo_object_free      = lov_object_free,
1687         .loo_object_print     = lov_object_print,
1688         .loo_object_invariant = NULL
1689 };
1690
1691 struct lu_object *lov_object_alloc(const struct lu_env *env,
1692                                    const struct lu_object_header *unused,
1693                                    struct lu_device *dev)
1694 {
1695         struct lov_object *lov;
1696         struct lu_object  *obj;
1697
1698         ENTRY;
1699         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
1700         if (lov != NULL) {
1701                 obj = lov2lu(lov);
1702                 lu_object_init(obj, NULL, dev);
1703                 lov->lo_cl.co_ops = &lov_ops;
1704                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
1705                 /*
1706                  * object io operation vector (cl_object::co_iop) is installed
1707                  * later in lov_object_init(), as different vectors are used
1708                  * for object with different layouts.
1709                  */
1710                 obj->lo_ops = &lov_lu_obj_ops;
1711         } else
1712                 obj = NULL;
1713         RETURN(obj);
1714 }
1715
1716 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
1717 {
1718         struct lov_stripe_md *lsm = NULL;
1719
1720         lov_conf_freeze(lov);
1721         if (lov->lo_lsm != NULL) {
1722                 lsm = lsm_addref(lov->lo_lsm);
1723                 CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
1724                         lsm, atomic_read(&lsm->lsm_refc),
1725                         lov->lo_layout_invalid, current);
1726         }
1727         lov_conf_thaw(lov);
1728         return lsm;
1729 }
1730
1731 int lov_read_and_clear_async_rc(struct cl_object *clob)
1732 {
1733         struct lu_object *luobj;
1734         int rc = 0;
1735         ENTRY;
1736
1737         luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
1738                                  &lov_device_type);
1739         if (luobj != NULL) {
1740                 struct lov_object *lov = lu2lov(luobj);
1741
1742                 lov_conf_freeze(lov);
1743                 switch (lov->lo_type) {
1744                 case LLT_COMP: {
1745                         struct lov_stripe_md *lsm;
1746                         int i;
1747
1748                         lsm = lov->lo_lsm;
1749                         LASSERT(lsm != NULL);
1750                         for (i = 0; i < lsm->lsm_entry_count; i++) {
1751                                 struct lov_stripe_md_entry *lse =
1752                                                 lsm->lsm_entries[i];
1753                                 int j;
1754
1755                                 for (j = 0; j < lse->lsme_stripe_count; j++) {
1756                                         struct lov_oinfo *loi =
1757                                                         lse->lsme_oinfo[j];
1758
1759                                         if (lov_oinfo_is_dummy(loi))
1760                                                 continue;
1761
1762                                         if (loi->loi_ar.ar_rc && !rc)
1763                                                 rc = loi->loi_ar.ar_rc;
1764                                         loi->loi_ar.ar_rc = 0;
1765                                 }
1766                         }
1767                 }
1768                 case LLT_RELEASED:
1769                 case LLT_EMPTY:
1770                         break;
1771                 default:
1772                         LBUG();
1773                 }
1774                 lov_conf_thaw(lov);
1775         }
1776         RETURN(rc);
1777 }
1778 EXPORT_SYMBOL(lov_read_and_clear_async_rc);
1779
1780 /** @} lov */