Whamcloud - gitweb
bd10384a2e95e51bec97611a3a68534e886f12d9
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_object for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LOV
43
44 #include "lov_cl_internal.h"
45
46 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
47 {
48         return lu2lov_dev(obj->lo_cl.co_lu.lo_dev);
49 }
50
51 /** \addtogroup lov
52  *  @{
53  */
54
55 /*****************************************************************************
56  *
57  * Layout operations.
58  *
59  */
60
61 struct lov_layout_operations {
62         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
63                         struct lov_object *lov, struct lov_stripe_md *lsm,
64                         const struct cl_object_conf *conf,
65                         union lov_layout_state *state);
66         int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
67                            union lov_layout_state *state);
68         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
69                          union lov_layout_state *state);
70         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
71                             union lov_layout_state *state);
72         int  (*llo_print)(const struct lu_env *env, void *cookie,
73                           lu_printer_t p, const struct lu_object *o);
74         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
75                               struct cl_page *page, pgoff_t index);
76         int  (*llo_lock_init)(const struct lu_env *env,
77                               struct cl_object *obj, struct cl_lock *lock,
78                               const struct cl_io *io);
79         int  (*llo_io_init)(const struct lu_env *env,
80                             struct cl_object *obj, struct cl_io *io);
81         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
82                             struct cl_attr *attr);
83 };
84
85 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
86
87 static void lov_lsm_put(struct lov_stripe_md *lsm)
88 {
89         if (lsm != NULL)
90                 lov_free_memmd(&lsm);
91 }
92
93 /*****************************************************************************
94  *
95  * Lov object layout operations.
96  *
97  */
98
99 static void lov_install_empty(const struct lu_env *env,
100                               struct lov_object *lov,
101                               union  lov_layout_state *state)
102 {
103         /*
104          * File without objects.
105          */
106 }
107
108 static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
109                           struct lov_object *lov, struct lov_stripe_md *lsm,
110                           const struct cl_object_conf *conf,
111                           union lov_layout_state *state)
112 {
113         return 0;
114 }
115
116 static void lov_install_raid0(const struct lu_env *env,
117                               struct lov_object *lov,
118                               union  lov_layout_state *state)
119 {
120 }
121
122 static struct cl_object *lov_sub_find(const struct lu_env *env,
123                                       struct cl_device *dev,
124                                       const struct lu_fid *fid,
125                                       const struct cl_object_conf *conf)
126 {
127         struct lu_object *o;
128
129         ENTRY;
130         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
131         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
132         RETURN(lu2cl(o));
133 }
134
135 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
136                         struct cl_object *stripe, struct lov_layout_raid0 *r0,
137                         int idx)
138 {
139         struct cl_object_header *hdr;
140         struct cl_object_header *subhdr;
141         struct cl_object_header *parent;
142         struct lov_oinfo        *oinfo;
143         int result;
144
145         if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
146                 /* For sanity:test_206.
147                  * Do not leave the object in cache to avoid accessing
148                  * freed memory. This is because osc_object is referring to
149                  * lov_oinfo of lsm_stripe_data which will be freed due to
150                  * this failure. */
151                 cl_object_kill(env, stripe);
152                 cl_object_put(env, stripe);
153                 return -EIO;
154         }
155
156         hdr    = cl_object_header(lov2cl(lov));
157         subhdr = cl_object_header(stripe);
158
159         oinfo = lov->lo_lsm->lsm_oinfo[idx];
160         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
161                " idx: %d gen: %d\n",
162                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
163                PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
164                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
165
166         /* reuse ->coh_attr_guard to protect coh_parent change */
167         spin_lock(&subhdr->coh_attr_guard);
168         parent = subhdr->coh_parent;
169         if (parent == NULL) {
170                 subhdr->coh_parent = hdr;
171                 spin_unlock(&subhdr->coh_attr_guard);
172                 subhdr->coh_nesting = hdr->coh_nesting + 1;
173                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
174                 r0->lo_sub[idx] = cl2lovsub(stripe);
175                 r0->lo_sub[idx]->lso_super = lov;
176                 r0->lo_sub[idx]->lso_index = idx;
177                 result = 0;
178         } else {
179                 struct lu_object  *old_obj;
180                 struct lov_object *old_lov;
181                 unsigned int mask = D_INODE;
182
183                 spin_unlock(&subhdr->coh_attr_guard);
184                 old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
185                 LASSERT(old_obj != NULL);
186                 old_lov = cl2lov(lu2cl(old_obj));
187                 if (old_lov->lo_layout_invalid) {
188                         /* the object's layout has already changed but isn't
189                          * refreshed */
190                         lu_object_unhash(env, &stripe->co_lu);
191                         result = -EAGAIN;
192                 } else {
193                         mask = D_ERROR;
194                         result = -EIO;
195                 }
196
197                 LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
198                                 "stripe %d is already owned.", idx);
199                 LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
200                 LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
201                 cl_object_put(env, stripe);
202         }
203         return result;
204 }
205
206 static int lov_page_slice_fixup(struct lov_object *lov,
207                                 struct cl_object *stripe)
208 {
209         struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
210         struct cl_object *o;
211
212         if (stripe == NULL)
213                 return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
214                        cfs_size_round(sizeof(struct lov_page));
215
216         cl_object_for_each(o, stripe)
217                 o->co_slice_off += hdr->coh_page_bufsize;
218
219         return cl_object_header(stripe)->coh_page_bufsize;
220 }
221
222 static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
223                           struct lov_object *lov, struct lov_stripe_md *lsm,
224                           const struct cl_object_conf *conf,
225                           union lov_layout_state *state)
226 {
227         int result;
228         int i;
229
230         struct cl_object        *stripe;
231         struct lov_thread_info  *lti     = lov_env_info(env);
232         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
233         struct lu_fid           *ofid    = &lti->lti_fid;
234         struct lov_layout_raid0 *r0      = &state->raid0;
235
236         ENTRY;
237
238         if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
239                 dump_lsm(D_ERROR, lsm);
240                 LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
241                          LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
242         }
243
244         LASSERT(lov->lo_lsm == NULL);
245         lov->lo_lsm = lsm_addref(lsm);
246         r0->lo_nr = lsm->lsm_stripe_count;
247         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
248
249         lov->lo_layout_invalid = true;
250
251         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
252         if (r0->lo_sub != NULL) {
253                 int psz = 0;
254
255                 result = 0;
256                 subconf->coc_inode = conf->coc_inode;
257                 spin_lock_init(&r0->lo_sub_lock);
258                 /*
259                  * Create stripe cl_objects.
260                  */
261                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
262                         struct cl_device *subdev;
263                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
264                         int ost_idx = oinfo->loi_ost_idx;
265
266                         if (lov_oinfo_is_dummy(oinfo))
267                                 continue;
268
269                         result = ostid_to_fid(ofid, &oinfo->loi_oi,
270                                               oinfo->loi_ost_idx);
271                         if (result != 0)
272                                 GOTO(out, result);
273
274                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
275                         subconf->u.coc_oinfo = oinfo;
276                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
277                         /* In the function below, .hs_keycmp resolves to
278                          * lu_obj_hop_keycmp() */
279                         /* coverity[overrun-buffer-val] */
280                         stripe = lov_sub_find(env, subdev, ofid, subconf);
281                         if (!IS_ERR(stripe)) {
282                                 result = lov_init_sub(env, lov, stripe, r0, i);
283                                 if (result == -EAGAIN) { /* try again */
284                                         --i;
285                                         result = 0;
286                                         continue;
287                                 }
288                         } else {
289                                 result = PTR_ERR(stripe);
290                         }
291
292                         if (result == 0) {
293                                 int sz = lov_page_slice_fixup(lov, stripe);
294                                 LASSERT(ergo(psz > 0, psz == sz));
295                                 psz = sz;
296                         }
297                 }
298                 if (result == 0)
299                         cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
300         } else
301                 result = -ENOMEM;
302 out:
303         RETURN(result);
304 }
305
306 static int lov_init_released(const struct lu_env *env,
307                              struct lov_device *dev, struct lov_object *lov,
308                              struct lov_stripe_md *lsm,
309                              const struct cl_object_conf *conf,
310                              union lov_layout_state *state)
311 {
312         LASSERT(lsm != NULL);
313         LASSERT(lsm_is_released(lsm));
314         LASSERT(lov->lo_lsm == NULL);
315
316         lov->lo_lsm = lsm_addref(lsm);
317         return 0;
318 }
319
320 static struct cl_object *lov_find_subobj(const struct lu_env *env,
321                                          struct lov_object *lov,
322                                          struct lov_stripe_md *lsm,
323                                          int stripe_idx)
324 {
325         struct lov_device       *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
326         struct lov_oinfo        *oinfo = lsm->lsm_oinfo[stripe_idx];
327         struct lov_thread_info  *lti = lov_env_info(env);
328         struct lu_fid           *ofid = &lti->lti_fid;
329         struct cl_device        *subdev;
330         int                     ost_idx;
331         int                     rc;
332         struct cl_object        *result;
333
334         if (lov->lo_type != LLT_RAID0)
335                 GOTO(out, result = NULL);
336
337         ost_idx = oinfo->loi_ost_idx;
338         rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
339         if (rc != 0)
340                 GOTO(out, result = NULL);
341
342         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
343         result = lov_sub_find(env, subdev, ofid, NULL);
344 out:
345         if (result == NULL)
346                 result = ERR_PTR(-EINVAL);
347         return result;
348 }
349
350 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
351                             union lov_layout_state *state)
352 {
353         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
354
355         lov_layout_wait(env, lov);
356         return 0;
357 }
358
359 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
360                                struct lovsub_object *los, int idx)
361 {
362         struct cl_object        *sub;
363         struct lov_layout_raid0 *r0;
364         struct lu_site          *site;
365         struct lu_site_bkt_data *bkt;
366         wait_queue_t          *waiter;
367
368         r0  = &lov->u.raid0;
369         LASSERT(r0->lo_sub[idx] == los);
370
371         sub  = lovsub2cl(los);
372         site = sub->co_lu.lo_dev->ld_site;
373         bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
374
375         cl_object_kill(env, sub);
376         /* release a reference to the sub-object and ... */
377         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
378         cl_object_put(env, sub);
379
380         /* ... wait until it is actually destroyed---sub-object clears its
381          * ->lo_sub[] slot in lovsub_object_fini() */
382         if (r0->lo_sub[idx] == los) {
383                 waiter = &lov_env_info(env)->lti_waiter;
384                 init_waitqueue_entry(waiter, current);
385                 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
386                 set_current_state(TASK_UNINTERRUPTIBLE);
387                 while (1) {
388                         /* this wait-queue is signaled at the end of
389                          * lu_object_free(). */
390                         set_current_state(TASK_UNINTERRUPTIBLE);
391                         spin_lock(&r0->lo_sub_lock);
392                         if (r0->lo_sub[idx] == los) {
393                                 spin_unlock(&r0->lo_sub_lock);
394                                 schedule();
395                         } else {
396                                 spin_unlock(&r0->lo_sub_lock);
397                                 set_current_state(TASK_RUNNING);
398                                 break;
399                         }
400                 }
401                 remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
402         }
403         LASSERT(r0->lo_sub[idx] == NULL);
404 }
405
406 static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
407                             union lov_layout_state *state)
408 {
409         struct lov_layout_raid0 *r0 = &state->raid0;
410         struct lov_stripe_md    *lsm = lov->lo_lsm;
411         int i;
412
413         ENTRY;
414
415         dump_lsm(D_INODE, lsm);
416
417         lov_layout_wait(env, lov);
418         if (r0->lo_sub != NULL) {
419                 for (i = 0; i < r0->lo_nr; ++i) {
420                         struct lovsub_object *los = r0->lo_sub[i];
421
422                         if (los != NULL) {
423                                 cl_object_prune(env, &los->lso_cl);
424                                 /*
425                                  * If top-level object is to be evicted from
426                                  * the cache, so are its sub-objects.
427                                  */
428                                 lov_subobject_kill(env, lov, los, i);
429                         }
430                 }
431         }
432         RETURN(0);
433 }
434
435 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
436                            union lov_layout_state *state)
437 {
438         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
439 }
440
441 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
442                            union lov_layout_state *state)
443 {
444         struct lov_layout_raid0 *r0 = &state->raid0;
445         ENTRY;
446
447         if (r0->lo_sub != NULL) {
448                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
449                 r0->lo_sub = NULL;
450         }
451
452         dump_lsm(D_INODE, lov->lo_lsm);
453         lov_free_memmd(&lov->lo_lsm);
454
455         EXIT;
456 }
457
458 static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
459                                 union lov_layout_state *state)
460 {
461         ENTRY;
462         dump_lsm(D_INODE, lov->lo_lsm);
463         lov_free_memmd(&lov->lo_lsm);
464         EXIT;
465 }
466
467 static int lov_print_empty(const struct lu_env *env, void *cookie,
468                            lu_printer_t p, const struct lu_object *o)
469 {
470         (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
471         return 0;
472 }
473
474 static int lov_print_raid0(const struct lu_env *env, void *cookie,
475                            lu_printer_t p, const struct lu_object *o)
476 {
477         struct lov_object       *lov = lu2lov(o);
478         struct lov_layout_raid0 *r0  = lov_r0(lov);
479         struct lov_stripe_md    *lsm = lov->lo_lsm;
480         int                      i;
481
482         (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
483                 r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
484                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
485                 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
486         for (i = 0; i < r0->lo_nr; ++i) {
487                 struct lu_object *sub;
488
489                 if (r0->lo_sub[i] != NULL) {
490                         sub = lovsub2lu(r0->lo_sub[i]);
491                         lu_object_print(env, cookie, p, sub);
492                 } else {
493                         (*p)(env, cookie, "sub %d absent\n", i);
494                 }
495         }
496         return 0;
497 }
498
499 static int lov_print_released(const struct lu_env *env, void *cookie,
500                                 lu_printer_t p, const struct lu_object *o)
501 {
502         struct lov_object       *lov = lu2lov(o);
503         struct lov_stripe_md    *lsm = lov->lo_lsm;
504
505         (*p)(env, cookie,
506                 "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
507                 lov->lo_layout_invalid ? "invalid" : "valid", lsm,
508                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
509                 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
510         return 0;
511 }
512
513 /**
514  * Implements cl_object_operations::coo_attr_get() method for an object
515  * without stripes (LLT_EMPTY layout type).
516  *
517  * The only attributes this layer is authoritative in this case is
518  * cl_attr::cat_blocks---it's 0.
519  */
520 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
521                               struct cl_attr *attr)
522 {
523         attr->cat_blocks = 0;
524         return 0;
525 }
526
527 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
528                               struct cl_attr *attr)
529 {
530         struct lov_object       *lov = cl2lov(obj);
531         struct lov_layout_raid0 *r0 = lov_r0(lov);
532         struct cl_attr          *lov_attr = &r0->lo_attr;
533         int                      result = 0;
534
535         ENTRY;
536
537         /* this is called w/o holding type guard mutex, so it must be inside
538          * an on going IO otherwise lsm may be replaced.
539          * LU-2117: it turns out there exists one exception. For mmaped files,
540          * the lock of those files may be requested in the other file's IO
541          * context, and this function is called in ccc_lock_state(), it will
542          * hit this assertion.
543          * Anyway, it's still okay to call attr_get w/o type guard as layout
544          * can't go if locks exist. */
545         /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
546
547         if (!r0->lo_attr_valid) {
548                 struct lov_stripe_md    *lsm = lov->lo_lsm;
549                 struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
550                 __u64                    kms = 0;
551
552                 memset(lvb, 0, sizeof(*lvb));
553                 /* XXX: timestamps can be negative by sanity:test_39m,
554                  * how can it be? */
555                 lvb->lvb_atime = LLONG_MIN;
556                 lvb->lvb_ctime = LLONG_MIN;
557                 lvb->lvb_mtime = LLONG_MIN;
558
559                 /*
560                  * XXX that should be replaced with a loop over sub-objects,
561                  * doing cl_object_attr_get() on them. But for now, let's
562                  * reuse old lov code.
563                  */
564
565                 /*
566                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
567                  * happy. It's not needed, because new code uses
568                  * ->coh_attr_guard spin-lock to protect consistency of
569                  * sub-object attributes.
570                  */
571                 lov_stripe_lock(lsm);
572                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
573                 lov_stripe_unlock(lsm);
574                 if (result == 0) {
575                         cl_lvb2attr(lov_attr, lvb);
576                         lov_attr->cat_kms = kms;
577                         r0->lo_attr_valid = 1;
578                 }
579         }
580         if (result == 0) { /* merge results */
581                 attr->cat_blocks = lov_attr->cat_blocks;
582                 attr->cat_size = lov_attr->cat_size;
583                 attr->cat_kms = lov_attr->cat_kms;
584                 if (attr->cat_atime < lov_attr->cat_atime)
585                         attr->cat_atime = lov_attr->cat_atime;
586                 if (attr->cat_ctime < lov_attr->cat_ctime)
587                         attr->cat_ctime = lov_attr->cat_ctime;
588                 if (attr->cat_mtime < lov_attr->cat_mtime)
589                         attr->cat_mtime = lov_attr->cat_mtime;
590         }
591         RETURN(result);
592 }
593
594 const static struct lov_layout_operations lov_dispatch[] = {
595         [LLT_EMPTY] = {
596                 .llo_init      = lov_init_empty,
597                 .llo_delete    = lov_delete_empty,
598                 .llo_fini      = lov_fini_empty,
599                 .llo_install   = lov_install_empty,
600                 .llo_print     = lov_print_empty,
601                 .llo_page_init = lov_page_init_empty,
602                 .llo_lock_init = lov_lock_init_empty,
603                 .llo_io_init   = lov_io_init_empty,
604                 .llo_getattr   = lov_attr_get_empty,
605         },
606         [LLT_RAID0] = {
607                 .llo_init      = lov_init_raid0,
608                 .llo_delete    = lov_delete_raid0,
609                 .llo_fini      = lov_fini_raid0,
610                 .llo_install   = lov_install_raid0,
611                 .llo_print     = lov_print_raid0,
612                 .llo_page_init = lov_page_init_raid0,
613                 .llo_lock_init = lov_lock_init_raid0,
614                 .llo_io_init   = lov_io_init_raid0,
615                 .llo_getattr   = lov_attr_get_raid0,
616         },
617         [LLT_RELEASED] = {
618                 .llo_init      = lov_init_released,
619                 .llo_delete    = lov_delete_empty,
620                 .llo_fini      = lov_fini_released,
621                 .llo_install   = lov_install_empty,
622                 .llo_print     = lov_print_released,
623                 .llo_page_init = lov_page_init_empty,
624                 .llo_lock_init = lov_lock_init_empty,
625                 .llo_io_init   = lov_io_init_released,
626                 .llo_getattr   = lov_attr_get_empty,
627         }
628 };
629
630 /**
631  * Performs a double-dispatch based on the layout type of an object.
632  */
633 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)                              \
634 ({                                                                      \
635         struct lov_object                      *__obj = (obj);          \
636         enum lov_layout_type                    __llt;                  \
637                                                                         \
638         __llt = __obj->lo_type;                                         \
639         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
640         lov_dispatch[__llt].op(__VA_ARGS__);                            \
641 })
642
643 /**
644  * Return lov_layout_type associated with a given lsm
645  */
646 static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
647 {
648         if (lsm == NULL)
649                 return LLT_EMPTY;
650         if (lsm_is_released(lsm))
651                 return LLT_RELEASED;
652         return LLT_RAID0;
653 }
654
655 static inline void lov_conf_freeze(struct lov_object *lov)
656 {
657         if (lov->lo_owner != current)
658                 down_read(&lov->lo_type_guard);
659 }
660
661 static inline void lov_conf_thaw(struct lov_object *lov)
662 {
663         if (lov->lo_owner != current)
664                 up_read(&lov->lo_type_guard);
665 }
666
667 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
668 ({                                                                      \
669         struct lov_object                      *__obj = (obj);          \
670         int                                     __lock = !!(lock);      \
671         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
672                                                                         \
673         if (__lock)                                                     \
674                 lov_conf_freeze(__obj);                                 \
675         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
676         if (__lock)                                                     \
677                 lov_conf_thaw(__obj);                                   \
678         __result;                                                       \
679 })
680
681 /**
682  * Performs a locked double-dispatch based on the layout type of an object.
683  */
684 #define LOV_2DISPATCH(obj, op, ...)                     \
685         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
686
687 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
688 do {                                                                    \
689         struct lov_object                      *__obj = (obj);          \
690         enum lov_layout_type                    __llt;                  \
691                                                                         \
692         lov_conf_freeze(__obj);                                         \
693         __llt = __obj->lo_type;                                         \
694         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
695         lov_dispatch[__llt].op(__VA_ARGS__);                            \
696         lov_conf_thaw(__obj);                                           \
697 } while (0)
698
699 static void lov_conf_lock(struct lov_object *lov)
700 {
701         LASSERT(lov->lo_owner != current);
702         down_write(&lov->lo_type_guard);
703         LASSERT(lov->lo_owner == NULL);
704         lov->lo_owner = current;
705 }
706
707 static void lov_conf_unlock(struct lov_object *lov)
708 {
709         lov->lo_owner = NULL;
710         up_write(&lov->lo_type_guard);
711 }
712
713 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
714 {
715         struct l_wait_info lwi = { 0 };
716         ENTRY;
717
718         while (atomic_read(&lov->lo_active_ios) > 0) {
719                 CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
720                         PFID(lu_object_fid(lov2lu(lov))),
721                         atomic_read(&lov->lo_active_ios));
722
723                 l_wait_event(lov->lo_waitq,
724                              atomic_read(&lov->lo_active_ios) == 0, &lwi);
725         }
726         RETURN(0);
727 }
728
729 static int lov_layout_change(const struct lu_env *unused,
730                              struct lov_object *lov, struct lov_stripe_md *lsm,
731                              const struct cl_object_conf *conf)
732 {
733         enum lov_layout_type llt = lov_type(lsm);
734         union lov_layout_state *state = &lov->u;
735         const struct lov_layout_operations *old_ops;
736         const struct lov_layout_operations *new_ops;
737         void *cookie;
738         struct lu_env *env;
739         __u16 refcheck;
740         int rc;
741         ENTRY;
742
743         LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
744
745         cookie = cl_env_reenter();
746         env = cl_env_get(&refcheck);
747         if (IS_ERR(env)) {
748                 cl_env_reexit(cookie);
749                 RETURN(PTR_ERR(env));
750         }
751
752         LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
753
754         CDEBUG(D_INODE, DFID" from %s to %s\n",
755                PFID(lu_object_fid(lov2lu(lov))),
756                llt2str(lov->lo_type), llt2str(llt));
757
758         old_ops = &lov_dispatch[lov->lo_type];
759         new_ops = &lov_dispatch[llt];
760
761         rc = cl_object_prune(env, &lov->lo_cl);
762         if (rc != 0)
763                 GOTO(out, rc);
764
765         rc = old_ops->llo_delete(env, lov, &lov->u);
766         if (rc != 0)
767                 GOTO(out, rc);
768
769         old_ops->llo_fini(env, lov, &lov->u);
770
771         LASSERT(atomic_read(&lov->lo_active_ios) == 0);
772
773         lov->lo_type = LLT_EMPTY;
774
775         /* page bufsize fixup */
776         cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
777                 lov_page_slice_fixup(lov, NULL);
778
779         rc = new_ops->llo_init(env, lov_object_dev(lov), lov, lsm, conf, state);
780         if (rc != 0) {
781                 new_ops->llo_delete(env, lov, state);
782                 new_ops->llo_fini(env, lov, state);
783                 /* this file becomes an EMPTY file. */
784                 GOTO(out, rc);
785         }
786
787         new_ops->llo_install(env, lov, state);
788         lov->lo_type = llt;
789
790 out:
791         cl_env_put(env, &refcheck);
792         cl_env_reexit(cookie);
793
794         RETURN(rc);
795 }
796
797 /*****************************************************************************
798  *
799  * Lov object operations.
800  *
801  */
802 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
803                     const struct lu_object_conf *conf)
804 {
805         struct lov_object            *lov   = lu2lov(obj);
806         struct lov_device            *dev   = lov_object_dev(lov);
807         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
808         union lov_layout_state       *set   = &lov->u;
809         const struct lov_layout_operations *ops;
810         struct lov_stripe_md *lsm = NULL;
811         int rc;
812         ENTRY;
813
814         init_rwsem(&lov->lo_type_guard);
815         atomic_set(&lov->lo_active_ios, 0);
816         init_waitqueue_head(&lov->lo_waitq);
817         cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
818
819         lov->lo_type = LLT_EMPTY;
820         if (cconf->u.coc_layout.lb_buf != NULL) {
821                 lsm = lov_unpackmd(dev->ld_lov,
822                                    cconf->u.coc_layout.lb_buf,
823                                    cconf->u.coc_layout.lb_len);
824                 if (IS_ERR(lsm))
825                         RETURN(PTR_ERR(lsm));
826         }
827
828         /* no locking is necessary, as object is being created */
829         lov->lo_type = lov_type(lsm);
830         ops = &lov_dispatch[lov->lo_type];
831         rc = ops->llo_init(env, dev, lov, lsm, cconf, set);
832         if (rc != 0)
833                 GOTO(out_lsm, rc);
834
835         ops->llo_install(env, lov, set);
836
837 out_lsm:
838         lov_lsm_put(lsm);
839
840         RETURN(rc);
841 }
842
843 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
844                         const struct cl_object_conf *conf)
845 {
846         struct lov_stripe_md    *lsm = NULL;
847         struct lov_object       *lov = cl2lov(obj);
848         int                      result = 0;
849         ENTRY;
850
851         if (conf->coc_opc == OBJECT_CONF_SET &&
852             conf->u.coc_layout.lb_buf != NULL) {
853                 lsm = lov_unpackmd(lov_object_dev(lov)->ld_lov,
854                                    conf->u.coc_layout.lb_buf,
855                                    conf->u.coc_layout.lb_len);
856                 if (IS_ERR(lsm))
857                         RETURN(PTR_ERR(lsm));
858         }
859
860         lov_conf_lock(lov);
861         if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
862                 lov->lo_layout_invalid = true;
863                 GOTO(out, result = 0);
864         }
865
866         if (conf->coc_opc == OBJECT_CONF_WAIT) {
867                 if (lov->lo_layout_invalid &&
868                     atomic_read(&lov->lo_active_ios) > 0) {
869                         lov_conf_unlock(lov);
870                         result = lov_layout_wait(env, lov);
871                         lov_conf_lock(lov);
872                 }
873                 GOTO(out, result);
874         }
875
876         LASSERT(conf->coc_opc == OBJECT_CONF_SET);
877
878         if ((lsm == NULL && lov->lo_lsm == NULL) ||
879             ((lsm != NULL && lov->lo_lsm != NULL) &&
880              (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
881              (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
882                 /* same version of layout */
883                 lov->lo_layout_invalid = false;
884                 GOTO(out, result = 0);
885         }
886
887         /* will change layout - check if there still exists active IO. */
888         if (atomic_read(&lov->lo_active_ios) > 0) {
889                 lov->lo_layout_invalid = true;
890                 GOTO(out, result = -EBUSY);
891         }
892
893         result = lov_layout_change(env, lov, lsm, conf);
894         lov->lo_layout_invalid = result != 0;
895         EXIT;
896
897 out:
898         lov_conf_unlock(lov);
899         lov_lsm_put(lsm);
900         CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
901                PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
902         RETURN(result);
903 }
904
905 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
906 {
907         struct lov_object *lov = lu2lov(obj);
908
909         ENTRY;
910         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
911         EXIT;
912 }
913
914 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
915 {
916         struct lov_object *lov = lu2lov(obj);
917
918         ENTRY;
919         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
920         lu_object_fini(obj);
921         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
922         EXIT;
923 }
924
925 static int lov_object_print(const struct lu_env *env, void *cookie,
926                             lu_printer_t p, const struct lu_object *o)
927 {
928         return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
929 }
930
931 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
932                   struct cl_page *page, pgoff_t index)
933 {
934         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
935                                     index);
936 }
937
938 /**
939  * Implements cl_object_operations::clo_io_init() method for lov
940  * layer. Dispatches to the appropriate layout io initialization method.
941  */
942 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
943                 struct cl_io *io)
944 {
945         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
946         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
947                                      !io->ci_ignore_layout, env, obj, io);
948 }
949
950 /**
951  * An implementation of cl_object_operations::clo_attr_get() method for lov
952  * layer. For raid0 layout this collects and merges attributes of all
953  * sub-objects.
954  */
955 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
956                         struct cl_attr *attr)
957 {
958         /* do not take lock, as this function is called under a
959          * spin-lock. Layout is protected from changing by ongoing IO. */
960         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
961 }
962
963 static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
964                            const struct cl_attr *attr, unsigned valid)
965 {
966         /*
967          * No dispatch is required here, as no layout implements this.
968          */
969         return 0;
970 }
971
972 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
973                   struct cl_lock *lock, const struct cl_io *io)
974 {
975         /* No need to lock because we've taken one refcount of layout.  */
976         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock,
977                                     io);
978 }
979
980 /**
981  * We calculate on which OST the mapping will end. If the length of mapping
982  * is greater than (stripe_size * stripe_count) then the last_stripe will
983  * will be one just before start_stripe. Else we check if the mapping
984  * intersects each OST and find last_stripe.
985  * This function returns the last_stripe and also sets the stripe_count
986  * over which the mapping is spread
987  *
988  * \param lsm [in]              striping information for the file
989  * \param fm_start [in]         logical start of mapping
990  * \param fm_end [in]           logical end of mapping
991  * \param start_stripe [in]     starting stripe of the mapping
992  * \param stripe_count [out]    the number of stripes across which to map is
993  *                              returned
994  *
995  * \retval last_stripe          return the last stripe of the mapping
996  */
997 static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm,
998                                    loff_t fm_start, loff_t fm_end,
999                                    int start_stripe, int *stripe_count)
1000 {
1001         int last_stripe;
1002         loff_t obd_start;
1003         loff_t obd_end;
1004         int i, j;
1005
1006         if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
1007                 last_stripe = (start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
1008                                                               start_stripe - 1);
1009                 *stripe_count = lsm->lsm_stripe_count;
1010         } else {
1011                 for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
1012                      i = (i + 1) % lsm->lsm_stripe_count, j++) {
1013                         if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
1014                                                    &obd_start, &obd_end)) == 0)
1015                                 break;
1016                 }
1017                 *stripe_count = j;
1018                 last_stripe = (start_stripe + j - 1) % lsm->lsm_stripe_count;
1019         }
1020
1021         return last_stripe;
1022 }
1023
1024 /**
1025  * Set fe_device and copy extents from local buffer into main return buffer.
1026  *
1027  * \param fiemap [out]          fiemap to hold all extents
1028  * \param lcl_fm_ext [in]       array of fiemap extents get from OSC layer
1029  * \param ost_index [in]        OST index to be written into the fm_device
1030  *                              field for each extent
1031  * \param ext_count [in]        number of extents to be copied
1032  * \param current_extent [in]   where to start copying in the extent array
1033  */
1034 static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
1035                                          struct fiemap_extent *lcl_fm_ext,
1036                                          int ost_index, unsigned int ext_count,
1037                                          int current_extent)
1038 {
1039         char            *to;
1040         unsigned int    ext;
1041
1042         for (ext = 0; ext < ext_count; ext++) {
1043                 lcl_fm_ext[ext].fe_device = ost_index;
1044                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1045         }
1046
1047         /* Copy fm_extent's from fm_local to return buffer */
1048         to = (char *)fiemap + fiemap_count_to_size(current_extent);
1049         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct fiemap_extent));
1050 }
1051
1052 #define FIEMAP_BUFFER_SIZE 4096
1053
1054 /**
1055  * Non-zero fe_logical indicates that this is a continuation FIEMAP
1056  * call. The local end offset and the device are sent in the first
1057  * fm_extent. This function calculates the stripe number from the index.
1058  * This function returns a stripe_no on which mapping is to be restarted.
1059  *
1060  * This function returns fm_end_offset which is the in-OST offset at which
1061  * mapping should be restarted. If fm_end_offset=0 is returned then caller
1062  * will re-calculate proper offset in next stripe.
1063  * Note that the first extent is passed to lov_get_info via the value field.
1064  *
1065  * \param fiemap [in]           fiemap request header
1066  * \param lsm [in]              striping information for the file
1067  * \param fm_start [in]         logical start of mapping
1068  * \param fm_end [in]           logical end of mapping
1069  * \param start_stripe [out]    starting stripe will be returned in this
1070  */
1071 static loff_t fiemap_calc_fm_end_offset(struct fiemap *fiemap,
1072                                         struct lov_stripe_md *lsm,
1073                                         loff_t fm_start, loff_t fm_end,
1074                                         int *start_stripe)
1075 {
1076         loff_t local_end = fiemap->fm_extents[0].fe_logical;
1077         loff_t lun_start;
1078         loff_t lun_end;
1079         loff_t fm_end_offset;
1080         int stripe_no = -1;
1081         int i;
1082
1083         if (fiemap->fm_extent_count == 0 ||
1084             fiemap->fm_extents[0].fe_logical == 0)
1085                 return 0;
1086
1087         /* Find out stripe_no from ost_index saved in the fe_device */
1088         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1089                 struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
1090
1091                 if (lov_oinfo_is_dummy(oinfo))
1092                         continue;
1093
1094                 if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1095                         stripe_no = i;
1096                         break;
1097                 }
1098         }
1099
1100         if (stripe_no == -1)
1101                 return -EINVAL;
1102
1103         /* If we have finished mapping on previous device, shift logical
1104          * offset to start of next device */
1105         if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
1106                                   &lun_start, &lun_end) != 0 &&
1107             local_end < lun_end) {
1108                 fm_end_offset = local_end;
1109                 *start_stripe = stripe_no;
1110         } else {
1111                 /* This is a special value to indicate that caller should
1112                  * calculate offset in next stripe. */
1113                 fm_end_offset = 0;
1114                 *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
1115         }
1116
1117         return fm_end_offset;
1118 }
1119
1120 /**
1121  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1122  * This also handles the restarting of FIEMAP calls in case mapping overflows
1123  * the available number of extents in single call.
1124  *
1125  * \param env [in]              lustre environment
1126  * \param obj [in]              file object
1127  * \param fmkey [in]            fiemap request header and other info
1128  * \param fiemap [out]          fiemap buffer holding retrived map extents
1129  * \param buflen [in/out]       max buffer length of @fiemap, when iterate
1130  *                              each OST, it is used to limit max map needed
1131  * \retval 0    success
1132  * \retval < 0  error
1133  */
1134 static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
1135                              struct ll_fiemap_info_key *fmkey,
1136                              struct fiemap *fiemap, size_t *buflen)
1137 {
1138         struct lov_stripe_md    *lsm;
1139         struct cl_object        *subobj = NULL;
1140         struct lov_obd          *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
1141         struct fiemap           *fm_local = NULL;
1142         struct fiemap_extent    *lcl_fm_ext;
1143         loff_t                  fm_start;
1144         loff_t                  fm_end;
1145         loff_t                  fm_length;
1146         loff_t                  fm_end_offset;
1147         int                     count_local;
1148         int                     ost_index = 0;
1149         int                     start_stripe;
1150         int                     current_extent = 0;
1151         int                     rc = 0;
1152         int                     last_stripe;
1153         int                     cur_stripe = 0;
1154         int                     cur_stripe_wrap = 0;
1155         int                     stripe_count;
1156         unsigned int            buffer_size = FIEMAP_BUFFER_SIZE;
1157         /* Whether have we collected enough extents */
1158         bool                    enough = false;
1159         /* EOF for object */
1160         bool                    ost_eof = false;
1161         /* done with required mapping for this OST? */
1162         bool                    ost_done = false;
1163         ENTRY;
1164
1165         lsm = lov_lsm_addref(cl2lov(obj));
1166         if (lsm == NULL)
1167                 RETURN(-ENODATA);
1168
1169         /**
1170          * If the stripe_count > 1 and the application does not understand
1171          * DEVICE_ORDER flag, it cannot interpret the extents correctly.
1172          */
1173         if (lsm->lsm_stripe_count > 1 && !(fiemap->fm_flags &
1174                                            FIEMAP_FLAG_DEVICE_ORDER))
1175                 GOTO(out_lsm, rc = -ENOTSUPP);
1176
1177         if (lsm_is_released(lsm)) {
1178                 if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
1179                         /**
1180                          * released file, return a minimal FIEMAP if
1181                          * request fits in file-size.
1182                          */
1183                         fiemap->fm_mapped_extents = 1;
1184                         fiemap->fm_extents[0].fe_logical = fiemap->fm_start;
1185                         if (fiemap->fm_start + fiemap->fm_length <
1186                             fmkey->lfik_oa.o_size)
1187                                 fiemap->fm_extents[0].fe_length =
1188                                         fiemap->fm_length;
1189                         else
1190                                 fiemap->fm_extents[0].fe_length =
1191                                         fmkey->lfik_oa.o_size -
1192                                         fiemap->fm_start;
1193                         fiemap->fm_extents[0].fe_flags |=
1194                                 FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST;
1195                 }
1196                 GOTO(out_lsm, rc = 0);
1197         }
1198
1199         if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
1200                 buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
1201
1202         OBD_ALLOC_LARGE(fm_local, buffer_size);
1203         if (fm_local == NULL)
1204                 GOTO(out_lsm, rc = -ENOMEM);
1205         lcl_fm_ext = &fm_local->fm_extents[0];
1206         count_local = fiemap_size_to_count(buffer_size);
1207
1208         fm_start = fiemap->fm_start;
1209         fm_length = fiemap->fm_length;
1210         /* Calculate start stripe, last stripe and length of mapping */
1211         start_stripe = lov_stripe_number(lsm, fm_start);
1212         fm_end = (fm_length == ~0ULL) ? fmkey->lfik_oa.o_size :
1213                                         fm_start + fm_length - 1;
1214         /* If fm_length != ~0ULL but fm_start_fm_length-1 exceeds file size */
1215         if (fm_end > fmkey->lfik_oa.o_size)
1216                 fm_end = fmkey->lfik_oa.o_size;
1217
1218         last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
1219                                               start_stripe, &stripe_count);
1220         fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start, fm_end,
1221                                                   &start_stripe);
1222         if (fm_end_offset == -EINVAL)
1223                 GOTO(out_fm_local, rc = -EINVAL);
1224
1225         /**
1226          * Requested extent count exceeds the fiemap buffer size, shrink our
1227          * ambition.
1228          */
1229         if (fiemap_count_to_size(fiemap->fm_extent_count) > *buflen)
1230                 fiemap->fm_extent_count = fiemap_size_to_count(*buflen);
1231         if (fiemap->fm_extent_count == 0)
1232                 count_local = 0;
1233
1234         /* Check each stripe */
1235         for (cur_stripe = start_stripe; stripe_count > 0;
1236              --stripe_count,
1237              cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
1238                 loff_t req_fm_len; /* Stores length of required mapping */
1239                 loff_t len_mapped_single_call;
1240                 loff_t lun_start;
1241                 loff_t lun_end;
1242                 loff_t obd_object_end;
1243                 unsigned int ext_count;
1244
1245                 cur_stripe_wrap = cur_stripe;
1246
1247                 /* Find out range of mapping on this stripe */
1248                 if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
1249                                            &lun_start, &obd_object_end)) == 0)
1250                         continue;
1251
1252                 if (lov_oinfo_is_dummy(lsm->lsm_oinfo[cur_stripe]))
1253                         GOTO(out_fm_local, rc = -EIO);
1254
1255                 /* If this is a continuation FIEMAP call and we are on
1256                  * starting stripe then lun_start needs to be set to
1257                  * fm_end_offset */
1258                 if (fm_end_offset != 0 && cur_stripe == start_stripe)
1259                         lun_start = fm_end_offset;
1260
1261                 if (fm_length != ~0ULL) {
1262                         /* Handle fm_start + fm_length overflow */
1263                         if (fm_start + fm_length < fm_start)
1264                                 fm_length = ~0ULL - fm_start;
1265                         lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
1266                                                      cur_stripe);
1267                 } else {
1268                         lun_end = ~0ULL;
1269                 }
1270
1271                 if (lun_start == lun_end)
1272                         continue;
1273
1274                 req_fm_len = obd_object_end - lun_start;
1275                 fm_local->fm_length = 0;
1276                 len_mapped_single_call = 0;
1277
1278                 /* find lobsub object */
1279                 subobj = lov_find_subobj(env, cl2lov(obj), lsm,
1280                                              cur_stripe);
1281                 if (IS_ERR(subobj))
1282                         GOTO(out_fm_local, rc = PTR_ERR(subobj));
1283                 /* If the output buffer is very large and the objects have many
1284                  * extents we may need to loop on a single OST repeatedly */
1285                 ost_eof = false;
1286                 ost_done = false;
1287                 do {
1288                         if (fiemap->fm_extent_count > 0) {
1289                                 /* Don't get too many extents. */
1290                                 if (current_extent + count_local >
1291                                     fiemap->fm_extent_count)
1292                                         count_local = fiemap->fm_extent_count -
1293                                                       current_extent;
1294                         }
1295
1296                         lun_start += len_mapped_single_call;
1297                         fm_local->fm_length = req_fm_len -
1298                                               len_mapped_single_call;
1299                         req_fm_len = fm_local->fm_length;
1300                         fm_local->fm_extent_count = enough ? 1 : count_local;
1301                         fm_local->fm_mapped_extents = 0;
1302                         fm_local->fm_flags = fiemap->fm_flags;
1303
1304                         ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
1305
1306                         if (ost_index < 0 ||
1307                             ost_index >= lov->desc.ld_tgt_count)
1308                                 GOTO(obj_put, rc = -EINVAL);
1309                         /* If OST is inactive, return extent with UNKNOWN
1310                          * flag. */
1311                         if (!lov->lov_tgts[ost_index]->ltd_active) {
1312                                 fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
1313                                 fm_local->fm_mapped_extents = 1;
1314
1315                                 lcl_fm_ext[0].fe_logical = lun_start;
1316                                 lcl_fm_ext[0].fe_length = obd_object_end -
1317                                                           lun_start;
1318                                 lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1319
1320                                 goto inactive_tgt;
1321                         }
1322
1323                         fm_local->fm_start = lun_start;
1324                         fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1325                         memcpy(&fmkey->lfik_fiemap, fm_local,
1326                                sizeof(*fm_local));
1327                         *buflen = fiemap_count_to_size(
1328                                                 fm_local->fm_extent_count);
1329
1330                         rc = cl_object_fiemap(env, subobj, fmkey, fm_local,
1331                                               buflen);
1332                         if (rc != 0)
1333                                 GOTO(obj_put, rc);
1334 inactive_tgt:
1335                         ext_count = fm_local->fm_mapped_extents;
1336                         if (ext_count == 0) {
1337                                 ost_done = true;
1338                                 /* If last stripe has hold at the end,
1339                                  * we need to return */
1340                                 if (cur_stripe_wrap == last_stripe) {
1341                                         fiemap->fm_mapped_extents = 0;
1342                                         goto finish;
1343                                 }
1344                                 break;
1345                         } else if (enough) {
1346                                 /*
1347                                  * We've collected enough extents and there are
1348                                  * more extents after it.
1349                                  */
1350                                 goto finish;
1351                         }
1352
1353                         /* If we just need num of extents, got to next device */
1354                         if (fiemap->fm_extent_count == 0) {
1355                                 current_extent += ext_count;
1356                                 break;
1357                         }
1358
1359                         /* prepare to copy retrived map extents */
1360                         len_mapped_single_call =
1361                                 lcl_fm_ext[ext_count - 1].fe_logical -
1362                                 lun_start + lcl_fm_ext[ext_count - 1].fe_length;
1363
1364                         /* Have we finished mapping on this device? */
1365                         if (req_fm_len <= len_mapped_single_call)
1366                                 ost_done = true;
1367
1368                         /* Clear the EXTENT_LAST flag which can be present on
1369                          * the last extent */
1370                         if (lcl_fm_ext[ext_count - 1].fe_flags &
1371                             FIEMAP_EXTENT_LAST)
1372                                 lcl_fm_ext[ext_count - 1].fe_flags &=
1373                                                         ~FIEMAP_EXTENT_LAST;
1374                         if (lov_stripe_size(lsm,
1375                                         lcl_fm_ext[ext_count - 1].fe_logical +
1376                                         lcl_fm_ext[ext_count - 1].fe_length,
1377                                         cur_stripe) >= fmkey->lfik_oa.o_size)
1378                                 ost_eof = true;
1379
1380                         fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
1381                                                      ost_index, ext_count,
1382                                                      current_extent);
1383                         current_extent += ext_count;
1384
1385                         /* Ran out of available extents? */
1386                         if (current_extent >= fiemap->fm_extent_count)
1387                                 enough = true;
1388                 } while (!ost_done && !ost_eof);
1389
1390                 cl_object_put(env, subobj);
1391                 subobj = NULL;
1392
1393                 if (cur_stripe_wrap == last_stripe)
1394                         goto finish;
1395         } /* for each stripe */
1396 finish:
1397         /* Indicate that we are returning device offsets unless file just has
1398          * single stripe */
1399         if (lsm->lsm_stripe_count > 1)
1400                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1401
1402         if (fiemap->fm_extent_count == 0)
1403                 goto skip_last_device_calc;
1404
1405         /* Check if we have reached the last stripe and whether mapping for that
1406          * stripe is done. */
1407         if ((cur_stripe_wrap == last_stripe) && (ost_done || ost_eof))
1408                 fiemap->fm_extents[current_extent - 1].fe_flags |=
1409                                                              FIEMAP_EXTENT_LAST;
1410 skip_last_device_calc:
1411         fiemap->fm_mapped_extents = current_extent;
1412 obj_put:
1413         if (subobj != NULL)
1414                 cl_object_put(env, subobj);
1415 out_fm_local:
1416         OBD_FREE_LARGE(fm_local, buffer_size);
1417
1418 out_lsm:
1419         lov_lsm_put(lsm);
1420
1421         return rc;
1422 }
1423
1424 static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
1425                                 struct lov_user_md __user *lum)
1426 {
1427         struct lov_object       *lov = cl2lov(obj);
1428         struct lov_stripe_md    *lsm;
1429         int                     rc = 0;
1430         ENTRY;
1431
1432         lsm = lov_lsm_addref(lov);
1433         if (lsm == NULL)
1434                 RETURN(-ENODATA);
1435
1436         rc = lov_getstripe(cl2lov(obj), lsm, lum);
1437         lov_lsm_put(lsm);
1438         RETURN(rc);
1439 }
1440
1441 static int lov_object_layout_get(const struct lu_env *env,
1442                                  struct cl_object *obj,
1443                                  struct cl_layout *cl)
1444 {
1445         struct lov_object *lov = cl2lov(obj);
1446         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1447         struct lu_buf *buf = &cl->cl_buf;
1448         ssize_t rc;
1449         ENTRY;
1450
1451         if (lsm == NULL) {
1452                 cl->cl_size = 0;
1453                 cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
1454
1455                 RETURN(0);
1456         }
1457
1458         cl->cl_size = lov_mds_md_size(lsm->lsm_stripe_count, lsm->lsm_magic);
1459         cl->cl_layout_gen = lsm->lsm_layout_gen;
1460
1461         rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
1462         lov_lsm_put(lsm);
1463
1464         RETURN(rc < 0 ? rc : 0);
1465 }
1466
1467 static loff_t lov_object_maxbytes(struct cl_object *obj)
1468 {
1469         struct lov_object *lov = cl2lov(obj);
1470         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1471         loff_t maxbytes;
1472
1473         if (lsm == NULL)
1474                 return LLONG_MAX;
1475
1476         maxbytes = lsm->lsm_maxbytes;
1477
1478         lov_lsm_put(lsm);
1479
1480         return maxbytes;
1481 }
1482
1483 static const struct cl_object_operations lov_ops = {
1484         .coo_page_init    = lov_page_init,
1485         .coo_lock_init    = lov_lock_init,
1486         .coo_io_init      = lov_io_init,
1487         .coo_attr_get     = lov_attr_get,
1488         .coo_attr_update  = lov_attr_update,
1489         .coo_conf_set     = lov_conf_set,
1490         .coo_getstripe    = lov_object_getstripe,
1491         .coo_layout_get   = lov_object_layout_get,
1492         .coo_maxbytes     = lov_object_maxbytes,
1493         .coo_fiemap       = lov_object_fiemap,
1494 };
1495
1496 static const struct lu_object_operations lov_lu_obj_ops = {
1497         .loo_object_init      = lov_object_init,
1498         .loo_object_delete    = lov_object_delete,
1499         .loo_object_release   = NULL,
1500         .loo_object_free      = lov_object_free,
1501         .loo_object_print     = lov_object_print,
1502         .loo_object_invariant = NULL
1503 };
1504
1505 struct lu_object *lov_object_alloc(const struct lu_env *env,
1506                                    const struct lu_object_header *unused,
1507                                    struct lu_device *dev)
1508 {
1509         struct lov_object *lov;
1510         struct lu_object  *obj;
1511
1512         ENTRY;
1513         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
1514         if (lov != NULL) {
1515                 obj = lov2lu(lov);
1516                 lu_object_init(obj, NULL, dev);
1517                 lov->lo_cl.co_ops = &lov_ops;
1518                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
1519                 /*
1520                  * object io operation vector (cl_object::co_iop) is installed
1521                  * later in lov_object_init(), as different vectors are used
1522                  * for object with different layouts.
1523                  */
1524                 obj->lo_ops = &lov_lu_obj_ops;
1525         } else
1526                 obj = NULL;
1527         RETURN(obj);
1528 }
1529
1530 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
1531 {
1532         struct lov_stripe_md *lsm = NULL;
1533
1534         lov_conf_freeze(lov);
1535         if (lov->lo_lsm != NULL) {
1536                 lsm = lsm_addref(lov->lo_lsm);
1537                 CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
1538                         lsm, atomic_read(&lsm->lsm_refc),
1539                         lov->lo_layout_invalid, current);
1540         }
1541         lov_conf_thaw(lov);
1542         return lsm;
1543 }
1544
1545 int lov_read_and_clear_async_rc(struct cl_object *clob)
1546 {
1547         struct lu_object *luobj;
1548         int rc = 0;
1549         ENTRY;
1550
1551         luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
1552                                  &lov_device_type);
1553         if (luobj != NULL) {
1554                 struct lov_object *lov = lu2lov(luobj);
1555
1556                 lov_conf_freeze(lov);
1557                 switch (lov->lo_type) {
1558                 case LLT_RAID0: {
1559                         struct lov_stripe_md *lsm;
1560                         int i;
1561
1562                         lsm = lov->lo_lsm;
1563                         LASSERT(lsm != NULL);
1564                         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1565                                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1566
1567                                 if (lov_oinfo_is_dummy(loi))
1568                                         continue;
1569
1570                                 if (loi->loi_ar.ar_rc && !rc)
1571                                         rc = loi->loi_ar.ar_rc;
1572                                 loi->loi_ar.ar_rc = 0;
1573                         }
1574                 }
1575                 case LLT_RELEASED:
1576                 case LLT_EMPTY:
1577                         break;
1578                 default:
1579                         LBUG();
1580                 }
1581                 lov_conf_thaw(lov);
1582         }
1583         RETURN(rc);
1584 }
1585 EXPORT_SYMBOL(lov_read_and_clear_async_rc);
1586
1587 /** @} lov */