Whamcloud - gitweb
LU-2766 llite: don't ignore layout for group lock request
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_object for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LOV
43
44 #include "lov_cl_internal.h"
45
46 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
47 {
48         return lu2lov_dev(obj->lo_cl.co_lu.lo_dev);
49 }
50
51 /** \addtogroup lov
52  *  @{
53  */
54
55 /*****************************************************************************
56  *
57  * Layout operations.
58  *
59  */
60
61 struct lov_layout_operations {
62         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
63                         struct lov_object *lov, struct lov_stripe_md *lsm,
64                         const struct cl_object_conf *conf,
65                         union lov_layout_state *state);
66         int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
67                            union lov_layout_state *state);
68         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
69                          union lov_layout_state *state);
70         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
71                             union lov_layout_state *state);
72         int  (*llo_print)(const struct lu_env *env, void *cookie,
73                           lu_printer_t p, const struct lu_object *o);
74         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
75                               struct cl_page *page, pgoff_t index);
76         int  (*llo_lock_init)(const struct lu_env *env,
77                               struct cl_object *obj, struct cl_lock *lock,
78                               const struct cl_io *io);
79         int  (*llo_io_init)(const struct lu_env *env,
80                             struct cl_object *obj, struct cl_io *io);
81         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
82                             struct cl_attr *attr);
83 };
84
85 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
86
87 static void lov_lsm_put(struct lov_stripe_md *lsm)
88 {
89         if (lsm != NULL)
90                 lov_free_memmd(&lsm);
91 }
92
93 /*****************************************************************************
94  *
95  * Lov object layout operations.
96  *
97  */
98
99 static void lov_install_empty(const struct lu_env *env,
100                               struct lov_object *lov,
101                               union  lov_layout_state *state)
102 {
103         /*
104          * File without objects.
105          */
106 }
107
108 static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
109                           struct lov_object *lov, struct lov_stripe_md *lsm,
110                           const struct cl_object_conf *conf,
111                           union lov_layout_state *state)
112 {
113         return 0;
114 }
115
116 static void lov_install_raid0(const struct lu_env *env,
117                               struct lov_object *lov,
118                               union  lov_layout_state *state)
119 {
120 }
121
122 static struct cl_object *lov_sub_find(const struct lu_env *env,
123                                       struct cl_device *dev,
124                                       const struct lu_fid *fid,
125                                       const struct cl_object_conf *conf)
126 {
127         struct lu_object *o;
128
129         ENTRY;
130         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
131         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
132         RETURN(lu2cl(o));
133 }
134
135 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
136                         struct cl_object *stripe, struct lov_layout_raid0 *r0,
137                         int idx)
138 {
139         struct cl_object_header *hdr;
140         struct cl_object_header *subhdr;
141         struct cl_object_header *parent;
142         struct lov_oinfo        *oinfo;
143         int result;
144
145         if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
146                 /* For sanity:test_206.
147                  * Do not leave the object in cache to avoid accessing
148                  * freed memory. This is because osc_object is referring to
149                  * lov_oinfo of lsm_stripe_data which will be freed due to
150                  * this failure. */
151                 cl_object_kill(env, stripe);
152                 cl_object_put(env, stripe);
153                 return -EIO;
154         }
155
156         hdr    = cl_object_header(lov2cl(lov));
157         subhdr = cl_object_header(stripe);
158
159         oinfo = lov->lo_lsm->lsm_oinfo[idx];
160         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
161                " idx: %d gen: %d\n",
162                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
163                PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
164                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
165
166         /* reuse ->coh_attr_guard to protect coh_parent change */
167         spin_lock(&subhdr->coh_attr_guard);
168         parent = subhdr->coh_parent;
169         if (parent == NULL) {
170                 subhdr->coh_parent = hdr;
171                 spin_unlock(&subhdr->coh_attr_guard);
172                 subhdr->coh_nesting = hdr->coh_nesting + 1;
173                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
174                 r0->lo_sub[idx] = cl2lovsub(stripe);
175                 r0->lo_sub[idx]->lso_super = lov;
176                 r0->lo_sub[idx]->lso_index = idx;
177                 result = 0;
178         } else {
179                 struct lu_object  *old_obj;
180                 struct lov_object *old_lov;
181                 unsigned int mask = D_INODE;
182
183                 spin_unlock(&subhdr->coh_attr_guard);
184                 old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
185                 LASSERT(old_obj != NULL);
186                 old_lov = cl2lov(lu2cl(old_obj));
187                 if (old_lov->lo_layout_invalid) {
188                         /* the object's layout has already changed but isn't
189                          * refreshed */
190                         lu_object_unhash(env, &stripe->co_lu);
191                         result = -EAGAIN;
192                 } else {
193                         mask = D_ERROR;
194                         result = -EIO;
195                 }
196
197                 LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
198                                 "stripe %d is already owned.", idx);
199                 LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
200                 LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
201                 cl_object_put(env, stripe);
202         }
203         return result;
204 }
205
206 static int lov_page_slice_fixup(struct lov_object *lov,
207                                 struct cl_object *stripe)
208 {
209         struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
210         struct cl_object *o;
211
212         if (stripe == NULL)
213                 return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
214                        cfs_size_round(sizeof(struct lov_page));
215
216         cl_object_for_each(o, stripe)
217                 o->co_slice_off += hdr->coh_page_bufsize;
218
219         return cl_object_header(stripe)->coh_page_bufsize;
220 }
221
222 static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
223                           struct lov_object *lov, struct lov_stripe_md *lsm,
224                           const struct cl_object_conf *conf,
225                           union lov_layout_state *state)
226 {
227         int result;
228         int i;
229
230         struct cl_object        *stripe;
231         struct lov_thread_info  *lti     = lov_env_info(env);
232         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
233         struct lu_fid           *ofid    = &lti->lti_fid;
234         struct lov_layout_raid0 *r0      = &state->raid0;
235
236         ENTRY;
237
238         if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
239                 dump_lsm(D_ERROR, lsm);
240                 LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
241                          LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
242         }
243
244         LASSERT(lov->lo_lsm == NULL);
245         lov->lo_lsm = lsm_addref(lsm);
246         r0->lo_nr = lsm->lsm_stripe_count;
247         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
248
249         lov->lo_layout_invalid = true;
250
251         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
252         if (r0->lo_sub != NULL) {
253                 int psz = 0;
254
255                 result = 0;
256                 subconf->coc_inode = conf->coc_inode;
257                 spin_lock_init(&r0->lo_sub_lock);
258                 /*
259                  * Create stripe cl_objects.
260                  */
261                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
262                         struct cl_device *subdev;
263                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
264                         int ost_idx = oinfo->loi_ost_idx;
265
266                         if (lov_oinfo_is_dummy(oinfo))
267                                 continue;
268
269                         result = ostid_to_fid(ofid, &oinfo->loi_oi,
270                                               oinfo->loi_ost_idx);
271                         if (result != 0)
272                                 GOTO(out, result);
273
274                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
275                         subconf->u.coc_oinfo = oinfo;
276                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
277                         /* In the function below, .hs_keycmp resolves to
278                          * lu_obj_hop_keycmp() */
279                         /* coverity[overrun-buffer-val] */
280                         stripe = lov_sub_find(env, subdev, ofid, subconf);
281                         if (!IS_ERR(stripe)) {
282                                 result = lov_init_sub(env, lov, stripe, r0, i);
283                                 if (result == -EAGAIN) { /* try again */
284                                         --i;
285                                         result = 0;
286                                         continue;
287                                 }
288                         } else {
289                                 result = PTR_ERR(stripe);
290                         }
291
292                         if (result == 0) {
293                                 int sz = lov_page_slice_fixup(lov, stripe);
294                                 LASSERT(ergo(psz > 0, psz == sz));
295                                 psz = sz;
296                         }
297                 }
298                 if (result == 0)
299                         cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
300         } else
301                 result = -ENOMEM;
302 out:
303         RETURN(result);
304 }
305
306 static int lov_init_released(const struct lu_env *env,
307                              struct lov_device *dev, struct lov_object *lov,
308                              struct lov_stripe_md *lsm,
309                              const struct cl_object_conf *conf,
310                              union lov_layout_state *state)
311 {
312         LASSERT(lsm != NULL);
313         LASSERT(lsm_is_released(lsm));
314         LASSERT(lov->lo_lsm == NULL);
315
316         lov->lo_lsm = lsm_addref(lsm);
317         return 0;
318 }
319
320 static struct cl_object *lov_find_subobj(const struct lu_env *env,
321                                          struct lov_object *lov,
322                                          struct lov_stripe_md *lsm,
323                                          int stripe_idx)
324 {
325         struct lov_device       *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
326         struct lov_oinfo        *oinfo = lsm->lsm_oinfo[stripe_idx];
327         struct lov_thread_info  *lti = lov_env_info(env);
328         struct lu_fid           *ofid = &lti->lti_fid;
329         struct cl_device        *subdev;
330         int                     ost_idx;
331         int                     rc;
332         struct cl_object        *result;
333
334         if (lov->lo_type != LLT_RAID0)
335                 GOTO(out, result = NULL);
336
337         ost_idx = oinfo->loi_ost_idx;
338         rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
339         if (rc != 0)
340                 GOTO(out, result = NULL);
341
342         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
343         result = lov_sub_find(env, subdev, ofid, NULL);
344 out:
345         if (result == NULL)
346                 result = ERR_PTR(-EINVAL);
347         return result;
348 }
349
350 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
351                             union lov_layout_state *state)
352 {
353         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
354
355         lov_layout_wait(env, lov);
356         return 0;
357 }
358
359 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
360                                struct lovsub_object *los, int idx)
361 {
362         struct cl_object        *sub;
363         struct lov_layout_raid0 *r0;
364         struct lu_site          *site;
365         struct lu_site_bkt_data *bkt;
366         wait_queue_t          *waiter;
367
368         r0  = &lov->u.raid0;
369         LASSERT(r0->lo_sub[idx] == los);
370
371         sub  = lovsub2cl(los);
372         site = sub->co_lu.lo_dev->ld_site;
373         bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
374
375         cl_object_kill(env, sub);
376         /* release a reference to the sub-object and ... */
377         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
378         cl_object_put(env, sub);
379
380         /* ... wait until it is actually destroyed---sub-object clears its
381          * ->lo_sub[] slot in lovsub_object_fini() */
382         if (r0->lo_sub[idx] == los) {
383                 waiter = &lov_env_info(env)->lti_waiter;
384                 init_waitqueue_entry(waiter, current);
385                 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
386                 set_current_state(TASK_UNINTERRUPTIBLE);
387                 while (1) {
388                         /* this wait-queue is signaled at the end of
389                          * lu_object_free(). */
390                         set_current_state(TASK_UNINTERRUPTIBLE);
391                         spin_lock(&r0->lo_sub_lock);
392                         if (r0->lo_sub[idx] == los) {
393                                 spin_unlock(&r0->lo_sub_lock);
394                                 schedule();
395                         } else {
396                                 spin_unlock(&r0->lo_sub_lock);
397                                 set_current_state(TASK_RUNNING);
398                                 break;
399                         }
400                 }
401                 remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
402         }
403         LASSERT(r0->lo_sub[idx] == NULL);
404 }
405
406 static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
407                             union lov_layout_state *state)
408 {
409         struct lov_layout_raid0 *r0 = &state->raid0;
410         struct lov_stripe_md    *lsm = lov->lo_lsm;
411         int i;
412
413         ENTRY;
414
415         dump_lsm(D_INODE, lsm);
416
417         lov_layout_wait(env, lov);
418         if (r0->lo_sub != NULL) {
419                 for (i = 0; i < r0->lo_nr; ++i) {
420                         struct lovsub_object *los = r0->lo_sub[i];
421
422                         if (los != NULL) {
423                                 cl_object_prune(env, &los->lso_cl);
424                                 /*
425                                  * If top-level object is to be evicted from
426                                  * the cache, so are its sub-objects.
427                                  */
428                                 lov_subobject_kill(env, lov, los, i);
429                         }
430                 }
431         }
432         RETURN(0);
433 }
434
435 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
436                            union lov_layout_state *state)
437 {
438         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
439 }
440
441 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
442                            union lov_layout_state *state)
443 {
444         struct lov_layout_raid0 *r0 = &state->raid0;
445         ENTRY;
446
447         if (r0->lo_sub != NULL) {
448                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
449                 r0->lo_sub = NULL;
450         }
451
452         dump_lsm(D_INODE, lov->lo_lsm);
453         lov_free_memmd(&lov->lo_lsm);
454
455         EXIT;
456 }
457
458 static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
459                                 union lov_layout_state *state)
460 {
461         ENTRY;
462         dump_lsm(D_INODE, lov->lo_lsm);
463         lov_free_memmd(&lov->lo_lsm);
464         EXIT;
465 }
466
467 static int lov_print_empty(const struct lu_env *env, void *cookie,
468                            lu_printer_t p, const struct lu_object *o)
469 {
470         (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
471         return 0;
472 }
473
474 static int lov_print_raid0(const struct lu_env *env, void *cookie,
475                            lu_printer_t p, const struct lu_object *o)
476 {
477         struct lov_object       *lov = lu2lov(o);
478         struct lov_layout_raid0 *r0  = lov_r0(lov);
479         struct lov_stripe_md    *lsm = lov->lo_lsm;
480         int                      i;
481
482         (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
483                 r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
484                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
485                 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
486         for (i = 0; i < r0->lo_nr; ++i) {
487                 struct lu_object *sub;
488
489                 if (r0->lo_sub[i] != NULL) {
490                         sub = lovsub2lu(r0->lo_sub[i]);
491                         lu_object_print(env, cookie, p, sub);
492                 } else {
493                         (*p)(env, cookie, "sub %d absent\n", i);
494                 }
495         }
496         return 0;
497 }
498
499 static int lov_print_released(const struct lu_env *env, void *cookie,
500                                 lu_printer_t p, const struct lu_object *o)
501 {
502         struct lov_object       *lov = lu2lov(o);
503         struct lov_stripe_md    *lsm = lov->lo_lsm;
504
505         (*p)(env, cookie,
506                 "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
507                 lov->lo_layout_invalid ? "invalid" : "valid", lsm,
508                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
509                 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
510         return 0;
511 }
512
513 /**
514  * Implements cl_object_operations::coo_attr_get() method for an object
515  * without stripes (LLT_EMPTY layout type).
516  *
517  * The only attributes this layer is authoritative in this case is
518  * cl_attr::cat_blocks---it's 0.
519  */
520 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
521                               struct cl_attr *attr)
522 {
523         attr->cat_blocks = 0;
524         return 0;
525 }
526
527 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
528                               struct cl_attr *attr)
529 {
530         struct lov_object       *lov = cl2lov(obj);
531         struct lov_layout_raid0 *r0 = lov_r0(lov);
532         struct cl_attr          *lov_attr = &r0->lo_attr;
533         int                      result = 0;
534
535         ENTRY;
536
537         /* this is called w/o holding type guard mutex, so it must be inside
538          * an on going IO otherwise lsm may be replaced.
539          * LU-2117: it turns out there exists one exception. For mmaped files,
540          * the lock of those files may be requested in the other file's IO
541          * context, and this function is called in ccc_lock_state(), it will
542          * hit this assertion.
543          * Anyway, it's still okay to call attr_get w/o type guard as layout
544          * can't go if locks exist. */
545         /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
546
547         if (!r0->lo_attr_valid) {
548                 struct lov_stripe_md    *lsm = lov->lo_lsm;
549                 struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
550                 __u64                    kms = 0;
551
552                 memset(lvb, 0, sizeof(*lvb));
553                 /* XXX: timestamps can be negative by sanity:test_39m,
554                  * how can it be? */
555                 lvb->lvb_atime = LLONG_MIN;
556                 lvb->lvb_ctime = LLONG_MIN;
557                 lvb->lvb_mtime = LLONG_MIN;
558
559                 /*
560                  * XXX that should be replaced with a loop over sub-objects,
561                  * doing cl_object_attr_get() on them. But for now, let's
562                  * reuse old lov code.
563                  */
564
565                 /*
566                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
567                  * happy. It's not needed, because new code uses
568                  * ->coh_attr_guard spin-lock to protect consistency of
569                  * sub-object attributes.
570                  */
571                 lov_stripe_lock(lsm);
572                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
573                 lov_stripe_unlock(lsm);
574                 if (result == 0) {
575                         cl_lvb2attr(lov_attr, lvb);
576                         lov_attr->cat_kms = kms;
577                         r0->lo_attr_valid = 1;
578                 }
579         }
580         if (result == 0) { /* merge results */
581                 attr->cat_blocks = lov_attr->cat_blocks;
582                 attr->cat_size = lov_attr->cat_size;
583                 attr->cat_kms = lov_attr->cat_kms;
584                 if (attr->cat_atime < lov_attr->cat_atime)
585                         attr->cat_atime = lov_attr->cat_atime;
586                 if (attr->cat_ctime < lov_attr->cat_ctime)
587                         attr->cat_ctime = lov_attr->cat_ctime;
588                 if (attr->cat_mtime < lov_attr->cat_mtime)
589                         attr->cat_mtime = lov_attr->cat_mtime;
590         }
591         RETURN(result);
592 }
593
594 const static struct lov_layout_operations lov_dispatch[] = {
595         [LLT_EMPTY] = {
596                 .llo_init      = lov_init_empty,
597                 .llo_delete    = lov_delete_empty,
598                 .llo_fini      = lov_fini_empty,
599                 .llo_install   = lov_install_empty,
600                 .llo_print     = lov_print_empty,
601                 .llo_page_init = lov_page_init_empty,
602                 .llo_lock_init = lov_lock_init_empty,
603                 .llo_io_init   = lov_io_init_empty,
604                 .llo_getattr   = lov_attr_get_empty,
605         },
606         [LLT_RAID0] = {
607                 .llo_init      = lov_init_raid0,
608                 .llo_delete    = lov_delete_raid0,
609                 .llo_fini      = lov_fini_raid0,
610                 .llo_install   = lov_install_raid0,
611                 .llo_print     = lov_print_raid0,
612                 .llo_page_init = lov_page_init_raid0,
613                 .llo_lock_init = lov_lock_init_raid0,
614                 .llo_io_init   = lov_io_init_raid0,
615                 .llo_getattr   = lov_attr_get_raid0,
616         },
617         [LLT_RELEASED] = {
618                 .llo_init      = lov_init_released,
619                 .llo_delete    = lov_delete_empty,
620                 .llo_fini      = lov_fini_released,
621                 .llo_install   = lov_install_empty,
622                 .llo_print     = lov_print_released,
623                 .llo_page_init = lov_page_init_empty,
624                 .llo_lock_init = lov_lock_init_empty,
625                 .llo_io_init   = lov_io_init_released,
626                 .llo_getattr   = lov_attr_get_empty,
627         }
628 };
629
630 /**
631  * Performs a double-dispatch based on the layout type of an object.
632  */
633 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)                              \
634 ({                                                                      \
635         struct lov_object                      *__obj = (obj);          \
636         enum lov_layout_type                    __llt;                  \
637                                                                         \
638         __llt = __obj->lo_type;                                         \
639         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
640         lov_dispatch[__llt].op(__VA_ARGS__);                            \
641 })
642
643 /**
644  * Return lov_layout_type associated with a given lsm
645  */
646 static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
647 {
648         if (lsm == NULL)
649                 return LLT_EMPTY;
650         if (lsm_is_released(lsm))
651                 return LLT_RELEASED;
652         return LLT_RAID0;
653 }
654
655 static inline void lov_conf_freeze(struct lov_object *lov)
656 {
657         CDEBUG(D_INODE, "To take share lov(%p) owner %p/%p\n",
658                 lov, lov->lo_owner, current);
659         if (lov->lo_owner != current)
660                 down_read(&lov->lo_type_guard);
661 }
662
663 static inline void lov_conf_thaw(struct lov_object *lov)
664 {
665         CDEBUG(D_INODE, "To release share lov(%p) owner %p/%p\n",
666                 lov, lov->lo_owner, current);
667         if (lov->lo_owner != current)
668                 up_read(&lov->lo_type_guard);
669 }
670
671 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
672 ({                                                                      \
673         struct lov_object                      *__obj = (obj);          \
674         int                                     __lock = !!(lock);      \
675         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
676                                                                         \
677         if (__lock)                                                     \
678                 lov_conf_freeze(__obj);                                 \
679         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
680         if (__lock)                                                     \
681                 lov_conf_thaw(__obj);                                   \
682         __result;                                                       \
683 })
684
685 /**
686  * Performs a locked double-dispatch based on the layout type of an object.
687  */
688 #define LOV_2DISPATCH(obj, op, ...)                     \
689         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
690
691 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
692 do {                                                                    \
693         struct lov_object                      *__obj = (obj);          \
694         enum lov_layout_type                    __llt;                  \
695                                                                         \
696         lov_conf_freeze(__obj);                                         \
697         __llt = __obj->lo_type;                                         \
698         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
699         lov_dispatch[__llt].op(__VA_ARGS__);                            \
700         lov_conf_thaw(__obj);                                           \
701 } while (0)
702
703 static void lov_conf_lock(struct lov_object *lov)
704 {
705         LASSERT(lov->lo_owner != current);
706         down_write(&lov->lo_type_guard);
707         LASSERT(lov->lo_owner == NULL);
708         lov->lo_owner = current;
709         CDEBUG(D_INODE, "Took exclusive lov(%p) owner %p\n",
710                 lov, lov->lo_owner);
711 }
712
713 static void lov_conf_unlock(struct lov_object *lov)
714 {
715         CDEBUG(D_INODE, "To release exclusive lov(%p) owner %p\n",
716                 lov, lov->lo_owner);
717         lov->lo_owner = NULL;
718         up_write(&lov->lo_type_guard);
719 }
720
721 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
722 {
723         struct l_wait_info lwi = { 0 };
724         ENTRY;
725
726         while (atomic_read(&lov->lo_active_ios) > 0) {
727                 CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
728                         PFID(lu_object_fid(lov2lu(lov))),
729                         atomic_read(&lov->lo_active_ios));
730
731                 l_wait_event(lov->lo_waitq,
732                              atomic_read(&lov->lo_active_ios) == 0, &lwi);
733         }
734         RETURN(0);
735 }
736
737 static int lov_layout_change(const struct lu_env *unused,
738                              struct lov_object *lov, struct lov_stripe_md *lsm,
739                              const struct cl_object_conf *conf)
740 {
741         enum lov_layout_type llt = lov_type(lsm);
742         union lov_layout_state *state = &lov->u;
743         const struct lov_layout_operations *old_ops;
744         const struct lov_layout_operations *new_ops;
745         struct lov_device *lov_dev = lov_object_dev(lov);
746         struct lu_env *env;
747         __u16 refcheck;
748         int rc;
749         ENTRY;
750
751         LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
752
753         env = cl_env_get(&refcheck);
754         if (IS_ERR(env))
755                 RETURN(PTR_ERR(env));
756
757         LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
758
759         CDEBUG(D_INODE, DFID" from %s to %s\n",
760                PFID(lu_object_fid(lov2lu(lov))),
761                llt2str(lov->lo_type), llt2str(llt));
762
763         old_ops = &lov_dispatch[lov->lo_type];
764         new_ops = &lov_dispatch[llt];
765
766         rc = cl_object_prune(env, &lov->lo_cl);
767         if (rc != 0)
768                 GOTO(out, rc);
769
770         rc = old_ops->llo_delete(env, lov, &lov->u);
771         if (rc != 0)
772                 GOTO(out, rc);
773
774         old_ops->llo_fini(env, lov, &lov->u);
775
776         LASSERT(atomic_read(&lov->lo_active_ios) == 0);
777
778         CDEBUG(D_INODE, DFID "Apply new layout lov %p, type %d\n",
779                PFID(lu_object_fid(lov2lu(lov))), lov, llt);
780
781         lov->lo_type = LLT_EMPTY;
782
783         /* page bufsize fixup */
784         cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
785                 lov_page_slice_fixup(lov, NULL);
786
787         rc = new_ops->llo_init(env, lov_dev, lov, lsm, conf, state);
788         if (rc != 0) {
789                 struct obd_device *obd = lov2obd(lov_dev->ld_lov);
790
791                 CERROR("%s: cannot apply new layout on "DFID" : rc = %d\n",
792                        obd->obd_name, PFID(lu_object_fid(lov2lu(lov))), rc);
793                 new_ops->llo_delete(env, lov, state);
794                 new_ops->llo_fini(env, lov, state);
795                 /* this file becomes an EMPTY file. */
796                 GOTO(out, rc);
797         }
798
799         new_ops->llo_install(env, lov, state);
800         lov->lo_type = llt;
801
802 out:
803         cl_env_put(env, &refcheck);
804         RETURN(rc);
805 }
806
807 /*****************************************************************************
808  *
809  * Lov object operations.
810  *
811  */
812 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
813                     const struct lu_object_conf *conf)
814 {
815         struct lov_object            *lov   = lu2lov(obj);
816         struct lov_device            *dev   = lov_object_dev(lov);
817         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
818         union lov_layout_state       *set   = &lov->u;
819         const struct lov_layout_operations *ops;
820         struct lov_stripe_md *lsm = NULL;
821         int rc;
822         ENTRY;
823
824         init_rwsem(&lov->lo_type_guard);
825         atomic_set(&lov->lo_active_ios, 0);
826         init_waitqueue_head(&lov->lo_waitq);
827         cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
828
829         lov->lo_type = LLT_EMPTY;
830         if (cconf->u.coc_layout.lb_buf != NULL) {
831                 lsm = lov_unpackmd(dev->ld_lov,
832                                    cconf->u.coc_layout.lb_buf,
833                                    cconf->u.coc_layout.lb_len);
834                 if (IS_ERR(lsm))
835                         RETURN(PTR_ERR(lsm));
836         }
837
838         /* no locking is necessary, as object is being created */
839         lov->lo_type = lov_type(lsm);
840         ops = &lov_dispatch[lov->lo_type];
841         rc = ops->llo_init(env, dev, lov, lsm, cconf, set);
842         if (rc != 0)
843                 GOTO(out_lsm, rc);
844
845         ops->llo_install(env, lov, set);
846
847 out_lsm:
848         lov_lsm_put(lsm);
849
850         RETURN(rc);
851 }
852
853 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
854                         const struct cl_object_conf *conf)
855 {
856         struct lov_stripe_md    *lsm = NULL;
857         struct lov_object       *lov = cl2lov(obj);
858         int                      result = 0;
859         ENTRY;
860
861         if (conf->coc_opc == OBJECT_CONF_SET &&
862             conf->u.coc_layout.lb_buf != NULL) {
863                 lsm = lov_unpackmd(lov_object_dev(lov)->ld_lov,
864                                    conf->u.coc_layout.lb_buf,
865                                    conf->u.coc_layout.lb_len);
866                 if (IS_ERR(lsm))
867                         RETURN(PTR_ERR(lsm));
868         }
869
870         lov_conf_lock(lov);
871         if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
872                 lov->lo_layout_invalid = true;
873                 GOTO(out, result = 0);
874         }
875
876         if (conf->coc_opc == OBJECT_CONF_WAIT) {
877                 if (lov->lo_layout_invalid &&
878                     atomic_read(&lov->lo_active_ios) > 0) {
879                         lov_conf_unlock(lov);
880                         result = lov_layout_wait(env, lov);
881                         lov_conf_lock(lov);
882                 }
883                 GOTO(out, result);
884         }
885
886         LASSERT(conf->coc_opc == OBJECT_CONF_SET);
887
888         if ((lsm == NULL && lov->lo_lsm == NULL) ||
889             ((lsm != NULL && lov->lo_lsm != NULL) &&
890              (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
891              (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
892                 /* same version of layout */
893                 lov->lo_layout_invalid = false;
894                 GOTO(out, result = 0);
895         }
896
897         /* will change layout - check if there still exists active IO. */
898         if (atomic_read(&lov->lo_active_ios) > 0) {
899                 lov->lo_layout_invalid = true;
900                 GOTO(out, result = -EBUSY);
901         }
902
903         result = lov_layout_change(env, lov, lsm, conf);
904         lov->lo_layout_invalid = result != 0;
905         EXIT;
906
907 out:
908         lov_conf_unlock(lov);
909         lov_lsm_put(lsm);
910         CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
911                PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
912         RETURN(result);
913 }
914
915 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
916 {
917         struct lov_object *lov = lu2lov(obj);
918
919         ENTRY;
920         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
921         EXIT;
922 }
923
924 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
925 {
926         struct lov_object *lov = lu2lov(obj);
927
928         ENTRY;
929         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
930         lu_object_fini(obj);
931         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
932         EXIT;
933 }
934
935 static int lov_object_print(const struct lu_env *env, void *cookie,
936                             lu_printer_t p, const struct lu_object *o)
937 {
938         return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
939 }
940
941 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
942                   struct cl_page *page, pgoff_t index)
943 {
944         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
945                                     index);
946 }
947
948 /**
949  * Implements cl_object_operations::clo_io_init() method for lov
950  * layer. Dispatches to the appropriate layout io initialization method.
951  */
952 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
953                 struct cl_io *io)
954 {
955         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
956
957         CDEBUG(D_INODE, DFID "io %p type %d ignore/verify layout %d/%d\n",
958                PFID(lu_object_fid(&obj->co_lu)), io, io->ci_type,
959                io->ci_ignore_layout, io->ci_verify_layout);
960
961         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
962                                      !io->ci_ignore_layout, env, obj, io);
963 }
964
965 /**
966  * An implementation of cl_object_operations::clo_attr_get() method for lov
967  * layer. For raid0 layout this collects and merges attributes of all
968  * sub-objects.
969  */
970 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
971                         struct cl_attr *attr)
972 {
973         /* do not take lock, as this function is called under a
974          * spin-lock. Layout is protected from changing by ongoing IO. */
975         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
976 }
977
978 static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
979                            const struct cl_attr *attr, unsigned valid)
980 {
981         /*
982          * No dispatch is required here, as no layout implements this.
983          */
984         return 0;
985 }
986
987 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
988                   struct cl_lock *lock, const struct cl_io *io)
989 {
990         /* No need to lock because we've taken one refcount of layout.  */
991         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock,
992                                     io);
993 }
994
995 /**
996  * We calculate on which OST the mapping will end. If the length of mapping
997  * is greater than (stripe_size * stripe_count) then the last_stripe will
998  * will be one just before start_stripe. Else we check if the mapping
999  * intersects each OST and find last_stripe.
1000  * This function returns the last_stripe and also sets the stripe_count
1001  * over which the mapping is spread
1002  *
1003  * \param lsm [in]              striping information for the file
1004  * \param fm_start [in]         logical start of mapping
1005  * \param fm_end [in]           logical end of mapping
1006  * \param start_stripe [in]     starting stripe of the mapping
1007  * \param stripe_count [out]    the number of stripes across which to map is
1008  *                              returned
1009  *
1010  * \retval last_stripe          return the last stripe of the mapping
1011  */
1012 static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm,
1013                                    loff_t fm_start, loff_t fm_end,
1014                                    int start_stripe, int *stripe_count)
1015 {
1016         int last_stripe;
1017         loff_t obd_start;
1018         loff_t obd_end;
1019         int i, j;
1020
1021         if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
1022                 last_stripe = (start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
1023                                                               start_stripe - 1);
1024                 *stripe_count = lsm->lsm_stripe_count;
1025         } else {
1026                 for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
1027                      i = (i + 1) % lsm->lsm_stripe_count, j++) {
1028                         if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
1029                                                    &obd_start, &obd_end)) == 0)
1030                                 break;
1031                 }
1032                 *stripe_count = j;
1033                 last_stripe = (start_stripe + j - 1) % lsm->lsm_stripe_count;
1034         }
1035
1036         return last_stripe;
1037 }
1038
1039 /**
1040  * Set fe_device and copy extents from local buffer into main return buffer.
1041  *
1042  * \param fiemap [out]          fiemap to hold all extents
1043  * \param lcl_fm_ext [in]       array of fiemap extents get from OSC layer
1044  * \param ost_index [in]        OST index to be written into the fm_device
1045  *                              field for each extent
1046  * \param ext_count [in]        number of extents to be copied
1047  * \param current_extent [in]   where to start copying in the extent array
1048  */
1049 static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
1050                                          struct fiemap_extent *lcl_fm_ext,
1051                                          int ost_index, unsigned int ext_count,
1052                                          int current_extent)
1053 {
1054         char            *to;
1055         unsigned int    ext;
1056
1057         for (ext = 0; ext < ext_count; ext++) {
1058                 lcl_fm_ext[ext].fe_device = ost_index;
1059                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1060         }
1061
1062         /* Copy fm_extent's from fm_local to return buffer */
1063         to = (char *)fiemap + fiemap_count_to_size(current_extent);
1064         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct fiemap_extent));
1065 }
1066
1067 #define FIEMAP_BUFFER_SIZE 4096
1068
1069 /**
1070  * Non-zero fe_logical indicates that this is a continuation FIEMAP
1071  * call. The local end offset and the device are sent in the first
1072  * fm_extent. This function calculates the stripe number from the index.
1073  * This function returns a stripe_no on which mapping is to be restarted.
1074  *
1075  * This function returns fm_end_offset which is the in-OST offset at which
1076  * mapping should be restarted. If fm_end_offset=0 is returned then caller
1077  * will re-calculate proper offset in next stripe.
1078  * Note that the first extent is passed to lov_get_info via the value field.
1079  *
1080  * \param fiemap [in]           fiemap request header
1081  * \param lsm [in]              striping information for the file
1082  * \param fm_start [in]         logical start of mapping
1083  * \param fm_end [in]           logical end of mapping
1084  * \param start_stripe [out]    starting stripe will be returned in this
1085  */
1086 static loff_t fiemap_calc_fm_end_offset(struct fiemap *fiemap,
1087                                         struct lov_stripe_md *lsm,
1088                                         loff_t fm_start, loff_t fm_end,
1089                                         int *start_stripe)
1090 {
1091         loff_t local_end = fiemap->fm_extents[0].fe_logical;
1092         loff_t lun_start;
1093         loff_t lun_end;
1094         loff_t fm_end_offset;
1095         int stripe_no = -1;
1096         int i;
1097
1098         if (fiemap->fm_extent_count == 0 ||
1099             fiemap->fm_extents[0].fe_logical == 0)
1100                 return 0;
1101
1102         /* Find out stripe_no from ost_index saved in the fe_device */
1103         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1104                 struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
1105
1106                 if (lov_oinfo_is_dummy(oinfo))
1107                         continue;
1108
1109                 if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1110                         stripe_no = i;
1111                         break;
1112                 }
1113         }
1114
1115         if (stripe_no == -1)
1116                 return -EINVAL;
1117
1118         /* If we have finished mapping on previous device, shift logical
1119          * offset to start of next device */
1120         if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
1121                                   &lun_start, &lun_end) != 0 &&
1122             local_end < lun_end) {
1123                 fm_end_offset = local_end;
1124                 *start_stripe = stripe_no;
1125         } else {
1126                 /* This is a special value to indicate that caller should
1127                  * calculate offset in next stripe. */
1128                 fm_end_offset = 0;
1129                 *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
1130         }
1131
1132         return fm_end_offset;
1133 }
1134
1135 /**
1136  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1137  * This also handles the restarting of FIEMAP calls in case mapping overflows
1138  * the available number of extents in single call.
1139  *
1140  * \param env [in]              lustre environment
1141  * \param obj [in]              file object
1142  * \param fmkey [in]            fiemap request header and other info
1143  * \param fiemap [out]          fiemap buffer holding retrived map extents
1144  * \param buflen [in/out]       max buffer length of @fiemap, when iterate
1145  *                              each OST, it is used to limit max map needed
1146  * \retval 0    success
1147  * \retval < 0  error
1148  */
1149 static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
1150                              struct ll_fiemap_info_key *fmkey,
1151                              struct fiemap *fiemap, size_t *buflen)
1152 {
1153         struct lov_stripe_md    *lsm;
1154         struct cl_object        *subobj = NULL;
1155         struct lov_obd          *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
1156         struct fiemap           *fm_local = NULL;
1157         struct fiemap_extent    *lcl_fm_ext;
1158         loff_t                  fm_start;
1159         loff_t                  fm_end;
1160         loff_t                  fm_length;
1161         loff_t                  fm_end_offset;
1162         int                     count_local;
1163         int                     ost_index = 0;
1164         int                     start_stripe;
1165         int                     current_extent = 0;
1166         int                     rc = 0;
1167         int                     last_stripe;
1168         int                     cur_stripe = 0;
1169         int                     cur_stripe_wrap = 0;
1170         int                     stripe_count;
1171         unsigned int            buffer_size = FIEMAP_BUFFER_SIZE;
1172         /* Whether have we collected enough extents */
1173         bool                    enough = false;
1174         /* EOF for object */
1175         bool                    ost_eof = false;
1176         /* done with required mapping for this OST? */
1177         bool                    ost_done = false;
1178         ENTRY;
1179
1180         lsm = lov_lsm_addref(cl2lov(obj));
1181         if (lsm == NULL)
1182                 RETURN(-ENODATA);
1183
1184         /**
1185          * If the stripe_count > 1 and the application does not understand
1186          * DEVICE_ORDER flag, it cannot interpret the extents correctly.
1187          */
1188         if (lsm->lsm_stripe_count > 1 && !(fiemap->fm_flags &
1189                                            FIEMAP_FLAG_DEVICE_ORDER))
1190                 GOTO(out_lsm, rc = -ENOTSUPP);
1191
1192         if (lsm_is_released(lsm)) {
1193                 if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
1194                         /**
1195                          * released file, return a minimal FIEMAP if
1196                          * request fits in file-size.
1197                          */
1198                         fiemap->fm_mapped_extents = 1;
1199                         fiemap->fm_extents[0].fe_logical = fiemap->fm_start;
1200                         if (fiemap->fm_start + fiemap->fm_length <
1201                             fmkey->lfik_oa.o_size)
1202                                 fiemap->fm_extents[0].fe_length =
1203                                         fiemap->fm_length;
1204                         else
1205                                 fiemap->fm_extents[0].fe_length =
1206                                         fmkey->lfik_oa.o_size -
1207                                         fiemap->fm_start;
1208                         fiemap->fm_extents[0].fe_flags |=
1209                                 FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST;
1210                 }
1211                 GOTO(out_lsm, rc = 0);
1212         }
1213
1214         if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
1215                 buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
1216
1217         OBD_ALLOC_LARGE(fm_local, buffer_size);
1218         if (fm_local == NULL)
1219                 GOTO(out_lsm, rc = -ENOMEM);
1220         lcl_fm_ext = &fm_local->fm_extents[0];
1221         count_local = fiemap_size_to_count(buffer_size);
1222
1223         fm_start = fiemap->fm_start;
1224         fm_length = fiemap->fm_length;
1225         /* Calculate start stripe, last stripe and length of mapping */
1226         start_stripe = lov_stripe_number(lsm, fm_start);
1227         fm_end = (fm_length == ~0ULL) ? fmkey->lfik_oa.o_size :
1228                                         fm_start + fm_length - 1;
1229         /* If fm_length != ~0ULL but fm_start_fm_length-1 exceeds file size */
1230         if (fm_end > fmkey->lfik_oa.o_size)
1231                 fm_end = fmkey->lfik_oa.o_size;
1232
1233         last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
1234                                               start_stripe, &stripe_count);
1235         fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start, fm_end,
1236                                                   &start_stripe);
1237         if (fm_end_offset == -EINVAL)
1238                 GOTO(out_fm_local, rc = -EINVAL);
1239
1240         /**
1241          * Requested extent count exceeds the fiemap buffer size, shrink our
1242          * ambition.
1243          */
1244         if (fiemap_count_to_size(fiemap->fm_extent_count) > *buflen)
1245                 fiemap->fm_extent_count = fiemap_size_to_count(*buflen);
1246         if (fiemap->fm_extent_count == 0)
1247                 count_local = 0;
1248
1249         /* Check each stripe */
1250         for (cur_stripe = start_stripe; stripe_count > 0;
1251              --stripe_count,
1252              cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
1253                 loff_t req_fm_len; /* Stores length of required mapping */
1254                 loff_t len_mapped_single_call;
1255                 loff_t lun_start;
1256                 loff_t lun_end;
1257                 loff_t obd_object_end;
1258                 unsigned int ext_count;
1259
1260                 cur_stripe_wrap = cur_stripe;
1261
1262                 /* Find out range of mapping on this stripe */
1263                 if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
1264                                            &lun_start, &obd_object_end)) == 0)
1265                         continue;
1266
1267                 if (lov_oinfo_is_dummy(lsm->lsm_oinfo[cur_stripe]))
1268                         GOTO(out_fm_local, rc = -EIO);
1269
1270                 /* If this is a continuation FIEMAP call and we are on
1271                  * starting stripe then lun_start needs to be set to
1272                  * fm_end_offset */
1273                 if (fm_end_offset != 0 && cur_stripe == start_stripe)
1274                         lun_start = fm_end_offset;
1275
1276                 if (fm_length != ~0ULL) {
1277                         /* Handle fm_start + fm_length overflow */
1278                         if (fm_start + fm_length < fm_start)
1279                                 fm_length = ~0ULL - fm_start;
1280                         lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
1281                                                      cur_stripe);
1282                 } else {
1283                         lun_end = ~0ULL;
1284                 }
1285
1286                 if (lun_start == lun_end)
1287                         continue;
1288
1289                 req_fm_len = obd_object_end - lun_start;
1290                 fm_local->fm_length = 0;
1291                 len_mapped_single_call = 0;
1292
1293                 /* find lobsub object */
1294                 subobj = lov_find_subobj(env, cl2lov(obj), lsm,
1295                                              cur_stripe);
1296                 if (IS_ERR(subobj))
1297                         GOTO(out_fm_local, rc = PTR_ERR(subobj));
1298                 /* If the output buffer is very large and the objects have many
1299                  * extents we may need to loop on a single OST repeatedly */
1300                 ost_eof = false;
1301                 ost_done = false;
1302                 do {
1303                         if (fiemap->fm_extent_count > 0) {
1304                                 /* Don't get too many extents. */
1305                                 if (current_extent + count_local >
1306                                     fiemap->fm_extent_count)
1307                                         count_local = fiemap->fm_extent_count -
1308                                                       current_extent;
1309                         }
1310
1311                         lun_start += len_mapped_single_call;
1312                         fm_local->fm_length = req_fm_len -
1313                                               len_mapped_single_call;
1314                         req_fm_len = fm_local->fm_length;
1315                         fm_local->fm_extent_count = enough ? 1 : count_local;
1316                         fm_local->fm_mapped_extents = 0;
1317                         fm_local->fm_flags = fiemap->fm_flags;
1318
1319                         ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
1320
1321                         if (ost_index < 0 ||
1322                             ost_index >= lov->desc.ld_tgt_count)
1323                                 GOTO(obj_put, rc = -EINVAL);
1324                         /* If OST is inactive, return extent with UNKNOWN
1325                          * flag. */
1326                         if (!lov->lov_tgts[ost_index]->ltd_active) {
1327                                 fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
1328                                 fm_local->fm_mapped_extents = 1;
1329
1330                                 lcl_fm_ext[0].fe_logical = lun_start;
1331                                 lcl_fm_ext[0].fe_length = obd_object_end -
1332                                                           lun_start;
1333                                 lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1334
1335                                 goto inactive_tgt;
1336                         }
1337
1338                         fm_local->fm_start = lun_start;
1339                         fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1340                         memcpy(&fmkey->lfik_fiemap, fm_local,
1341                                sizeof(*fm_local));
1342                         *buflen = fiemap_count_to_size(
1343                                                 fm_local->fm_extent_count);
1344
1345                         rc = cl_object_fiemap(env, subobj, fmkey, fm_local,
1346                                               buflen);
1347                         if (rc != 0)
1348                                 GOTO(obj_put, rc);
1349 inactive_tgt:
1350                         ext_count = fm_local->fm_mapped_extents;
1351                         if (ext_count == 0) {
1352                                 ost_done = true;
1353                                 /* If last stripe has hold at the end,
1354                                  * we need to return */
1355                                 if (cur_stripe_wrap == last_stripe) {
1356                                         fiemap->fm_mapped_extents = 0;
1357                                         goto finish;
1358                                 }
1359                                 break;
1360                         } else if (enough) {
1361                                 /*
1362                                  * We've collected enough extents and there are
1363                                  * more extents after it.
1364                                  */
1365                                 goto finish;
1366                         }
1367
1368                         /* If we just need num of extents, got to next device */
1369                         if (fiemap->fm_extent_count == 0) {
1370                                 current_extent += ext_count;
1371                                 break;
1372                         }
1373
1374                         /* prepare to copy retrived map extents */
1375                         len_mapped_single_call =
1376                                 lcl_fm_ext[ext_count - 1].fe_logical -
1377                                 lun_start + lcl_fm_ext[ext_count - 1].fe_length;
1378
1379                         /* Have we finished mapping on this device? */
1380                         if (req_fm_len <= len_mapped_single_call)
1381                                 ost_done = true;
1382
1383                         /* Clear the EXTENT_LAST flag which can be present on
1384                          * the last extent */
1385                         if (lcl_fm_ext[ext_count - 1].fe_flags &
1386                             FIEMAP_EXTENT_LAST)
1387                                 lcl_fm_ext[ext_count - 1].fe_flags &=
1388                                                         ~FIEMAP_EXTENT_LAST;
1389                         if (lov_stripe_size(lsm,
1390                                         lcl_fm_ext[ext_count - 1].fe_logical +
1391                                         lcl_fm_ext[ext_count - 1].fe_length,
1392                                         cur_stripe) >= fmkey->lfik_oa.o_size)
1393                                 ost_eof = true;
1394
1395                         fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
1396                                                      ost_index, ext_count,
1397                                                      current_extent);
1398                         current_extent += ext_count;
1399
1400                         /* Ran out of available extents? */
1401                         if (current_extent >= fiemap->fm_extent_count)
1402                                 enough = true;
1403                 } while (!ost_done && !ost_eof);
1404
1405                 cl_object_put(env, subobj);
1406                 subobj = NULL;
1407
1408                 if (cur_stripe_wrap == last_stripe)
1409                         goto finish;
1410         } /* for each stripe */
1411 finish:
1412         /* Indicate that we are returning device offsets unless file just has
1413          * single stripe */
1414         if (lsm->lsm_stripe_count > 1)
1415                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1416
1417         if (fiemap->fm_extent_count == 0)
1418                 goto skip_last_device_calc;
1419
1420         /* Check if we have reached the last stripe and whether mapping for that
1421          * stripe is done. */
1422         if ((cur_stripe_wrap == last_stripe) && (ost_done || ost_eof))
1423                 fiemap->fm_extents[current_extent - 1].fe_flags |=
1424                                                              FIEMAP_EXTENT_LAST;
1425 skip_last_device_calc:
1426         fiemap->fm_mapped_extents = current_extent;
1427 obj_put:
1428         if (subobj != NULL)
1429                 cl_object_put(env, subobj);
1430 out_fm_local:
1431         OBD_FREE_LARGE(fm_local, buffer_size);
1432
1433 out_lsm:
1434         lov_lsm_put(lsm);
1435
1436         return rc;
1437 }
1438
1439 static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
1440                                 struct lov_user_md __user *lum)
1441 {
1442         struct lov_object       *lov = cl2lov(obj);
1443         struct lov_stripe_md    *lsm;
1444         int                     rc = 0;
1445         ENTRY;
1446
1447         lsm = lov_lsm_addref(lov);
1448         if (lsm == NULL)
1449                 RETURN(-ENODATA);
1450
1451         rc = lov_getstripe(cl2lov(obj), lsm, lum);
1452         lov_lsm_put(lsm);
1453         RETURN(rc);
1454 }
1455
1456 static int lov_object_layout_get(const struct lu_env *env,
1457                                  struct cl_object *obj,
1458                                  struct cl_layout *cl)
1459 {
1460         struct lov_object *lov = cl2lov(obj);
1461         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1462         struct lu_buf *buf = &cl->cl_buf;
1463         ssize_t rc;
1464         ENTRY;
1465
1466         if (lsm == NULL) {
1467                 cl->cl_size = 0;
1468                 cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
1469
1470                 RETURN(0);
1471         }
1472
1473         cl->cl_size = lov_mds_md_size(lsm->lsm_stripe_count, lsm->lsm_magic);
1474         cl->cl_layout_gen = lsm->lsm_layout_gen;
1475
1476         rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
1477         lov_lsm_put(lsm);
1478
1479         RETURN(rc < 0 ? rc : 0);
1480 }
1481
1482 static loff_t lov_object_maxbytes(struct cl_object *obj)
1483 {
1484         struct lov_object *lov = cl2lov(obj);
1485         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1486         loff_t maxbytes;
1487
1488         if (lsm == NULL)
1489                 return LLONG_MAX;
1490
1491         maxbytes = lsm->lsm_maxbytes;
1492
1493         lov_lsm_put(lsm);
1494
1495         return maxbytes;
1496 }
1497
1498 static const struct cl_object_operations lov_ops = {
1499         .coo_page_init    = lov_page_init,
1500         .coo_lock_init    = lov_lock_init,
1501         .coo_io_init      = lov_io_init,
1502         .coo_attr_get     = lov_attr_get,
1503         .coo_attr_update  = lov_attr_update,
1504         .coo_conf_set     = lov_conf_set,
1505         .coo_getstripe    = lov_object_getstripe,
1506         .coo_layout_get   = lov_object_layout_get,
1507         .coo_maxbytes     = lov_object_maxbytes,
1508         .coo_fiemap       = lov_object_fiemap,
1509 };
1510
1511 static const struct lu_object_operations lov_lu_obj_ops = {
1512         .loo_object_init      = lov_object_init,
1513         .loo_object_delete    = lov_object_delete,
1514         .loo_object_release   = NULL,
1515         .loo_object_free      = lov_object_free,
1516         .loo_object_print     = lov_object_print,
1517         .loo_object_invariant = NULL
1518 };
1519
1520 struct lu_object *lov_object_alloc(const struct lu_env *env,
1521                                    const struct lu_object_header *unused,
1522                                    struct lu_device *dev)
1523 {
1524         struct lov_object *lov;
1525         struct lu_object  *obj;
1526
1527         ENTRY;
1528         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
1529         if (lov != NULL) {
1530                 obj = lov2lu(lov);
1531                 lu_object_init(obj, NULL, dev);
1532                 lov->lo_cl.co_ops = &lov_ops;
1533                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
1534                 /*
1535                  * object io operation vector (cl_object::co_iop) is installed
1536                  * later in lov_object_init(), as different vectors are used
1537                  * for object with different layouts.
1538                  */
1539                 obj->lo_ops = &lov_lu_obj_ops;
1540         } else
1541                 obj = NULL;
1542         RETURN(obj);
1543 }
1544
1545 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
1546 {
1547         struct lov_stripe_md *lsm = NULL;
1548
1549         lov_conf_freeze(lov);
1550         if (lov->lo_lsm != NULL) {
1551                 lsm = lsm_addref(lov->lo_lsm);
1552                 CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
1553                         lsm, atomic_read(&lsm->lsm_refc),
1554                         lov->lo_layout_invalid, current);
1555         }
1556         lov_conf_thaw(lov);
1557         return lsm;
1558 }
1559
1560 int lov_read_and_clear_async_rc(struct cl_object *clob)
1561 {
1562         struct lu_object *luobj;
1563         int rc = 0;
1564         ENTRY;
1565
1566         luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
1567                                  &lov_device_type);
1568         if (luobj != NULL) {
1569                 struct lov_object *lov = lu2lov(luobj);
1570
1571                 lov_conf_freeze(lov);
1572                 switch (lov->lo_type) {
1573                 case LLT_RAID0: {
1574                         struct lov_stripe_md *lsm;
1575                         int i;
1576
1577                         lsm = lov->lo_lsm;
1578                         LASSERT(lsm != NULL);
1579                         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1580                                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1581
1582                                 if (lov_oinfo_is_dummy(loi))
1583                                         continue;
1584
1585                                 if (loi->loi_ar.ar_rc && !rc)
1586                                         rc = loi->loi_ar.ar_rc;
1587                                 loi->loi_ar.ar_rc = 0;
1588                         }
1589                 }
1590                 case LLT_RELEASED:
1591                 case LLT_EMPTY:
1592                         break;
1593                 default:
1594                         LBUG();
1595                 }
1596                 lov_conf_thaw(lov);
1597         }
1598         RETURN(rc);
1599 }
1600 EXPORT_SYMBOL(lov_read_and_clear_async_rc);
1601
1602 /** @} lov */