Whamcloud - gitweb
1f2cccaa933da6854ec5c0f6683e73a04f19d33f
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_object for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LOV
43
44 #include "lov_cl_internal.h"
45
46 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
47 {
48         return lu2lov_dev(obj->lo_cl.co_lu.lo_dev);
49 }
50
51 /** \addtogroup lov
52  *  @{
53  */
54
55 /*****************************************************************************
56  *
57  * Layout operations.
58  *
59  */
60
61 struct lov_layout_operations {
62         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
63                         struct lov_object *lov, struct lov_stripe_md *lsm,
64                         const struct cl_object_conf *conf,
65                         union lov_layout_state *state);
66         int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
67                            union lov_layout_state *state);
68         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
69                          union lov_layout_state *state);
70         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
71                             union lov_layout_state *state);
72         int  (*llo_print)(const struct lu_env *env, void *cookie,
73                           lu_printer_t p, const struct lu_object *o);
74         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
75                               struct cl_page *page, pgoff_t index);
76         int  (*llo_lock_init)(const struct lu_env *env,
77                               struct cl_object *obj, struct cl_lock *lock,
78                               const struct cl_io *io);
79         int  (*llo_io_init)(const struct lu_env *env,
80                             struct cl_object *obj, struct cl_io *io);
81         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
82                             struct cl_attr *attr);
83         int  (*llo_find_cbdata)(const struct lu_env *env, struct cl_object *obj,
84                                 ldlm_iterator_t iter, void *data);
85 };
86
87 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
88
89 static void lov_lsm_put(struct lov_stripe_md *lsm)
90 {
91         if (lsm != NULL)
92                 lov_free_memmd(&lsm);
93 }
94
95 /*****************************************************************************
96  *
97  * Lov object layout operations.
98  *
99  */
100
101 static void lov_install_empty(const struct lu_env *env,
102                               struct lov_object *lov,
103                               union  lov_layout_state *state)
104 {
105         /*
106          * File without objects.
107          */
108 }
109
110 static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
111                           struct lov_object *lov, struct lov_stripe_md *lsm,
112                           const struct cl_object_conf *conf,
113                           union lov_layout_state *state)
114 {
115         return 0;
116 }
117
118 static void lov_install_raid0(const struct lu_env *env,
119                               struct lov_object *lov,
120                               union  lov_layout_state *state)
121 {
122 }
123
124 static struct cl_object *lov_sub_find(const struct lu_env *env,
125                                       struct cl_device *dev,
126                                       const struct lu_fid *fid,
127                                       const struct cl_object_conf *conf)
128 {
129         struct lu_object *o;
130
131         ENTRY;
132         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
133         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
134         RETURN(lu2cl(o));
135 }
136
137 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
138                         struct cl_object *stripe, struct lov_layout_raid0 *r0,
139                         int idx)
140 {
141         struct cl_object_header *hdr;
142         struct cl_object_header *subhdr;
143         struct cl_object_header *parent;
144         struct lov_oinfo        *oinfo;
145         int result;
146
147         if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
148                 /* For sanity:test_206.
149                  * Do not leave the object in cache to avoid accessing
150                  * freed memory. This is because osc_object is referring to
151                  * lov_oinfo of lsm_stripe_data which will be freed due to
152                  * this failure. */
153                 cl_object_kill(env, stripe);
154                 cl_object_put(env, stripe);
155                 return -EIO;
156         }
157
158         hdr    = cl_object_header(lov2cl(lov));
159         subhdr = cl_object_header(stripe);
160
161         oinfo = lov->lo_lsm->lsm_oinfo[idx];
162         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
163                " idx: %d gen: %d\n",
164                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
165                PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
166                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
167
168         /* reuse ->coh_attr_guard to protect coh_parent change */
169         spin_lock(&subhdr->coh_attr_guard);
170         parent = subhdr->coh_parent;
171         if (parent == NULL) {
172                 subhdr->coh_parent = hdr;
173                 spin_unlock(&subhdr->coh_attr_guard);
174                 subhdr->coh_nesting = hdr->coh_nesting + 1;
175                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
176                 r0->lo_sub[idx] = cl2lovsub(stripe);
177                 r0->lo_sub[idx]->lso_super = lov;
178                 r0->lo_sub[idx]->lso_index = idx;
179                 result = 0;
180         } else {
181                 struct lu_object  *old_obj;
182                 struct lov_object *old_lov;
183                 unsigned int mask = D_INODE;
184
185                 spin_unlock(&subhdr->coh_attr_guard);
186                 old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
187                 LASSERT(old_obj != NULL);
188                 old_lov = cl2lov(lu2cl(old_obj));
189                 if (old_lov->lo_layout_invalid) {
190                         /* the object's layout has already changed but isn't
191                          * refreshed */
192                         lu_object_unhash(env, &stripe->co_lu);
193                         result = -EAGAIN;
194                 } else {
195                         mask = D_ERROR;
196                         result = -EIO;
197                 }
198
199                 LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
200                                 "stripe %d is already owned.\n", idx);
201                 LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n");
202                 LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
203                 cl_object_put(env, stripe);
204         }
205         return result;
206 }
207
208 static int lov_page_slice_fixup(struct lov_object *lov,
209                                 struct cl_object *stripe)
210 {
211         struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
212         struct cl_object *o;
213
214         if (stripe == NULL)
215                 return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
216                        cfs_size_round(sizeof(struct lov_page));
217
218         cl_object_for_each(o, stripe)
219                 o->co_slice_off += hdr->coh_page_bufsize;
220
221         return cl_object_header(stripe)->coh_page_bufsize;
222 }
223
224 static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
225                           struct lov_object *lov, struct lov_stripe_md *lsm,
226                           const struct cl_object_conf *conf,
227                           union lov_layout_state *state)
228 {
229         int result;
230         int i;
231
232         struct cl_object        *stripe;
233         struct lov_thread_info  *lti     = lov_env_info(env);
234         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
235         struct lu_fid           *ofid    = &lti->lti_fid;
236         struct lov_layout_raid0 *r0      = &state->raid0;
237
238         ENTRY;
239
240         if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
241                 dump_lsm(D_ERROR, lsm);
242                 LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
243                          LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
244         }
245
246         LASSERT(lov->lo_lsm == NULL);
247         lov->lo_lsm = lsm_addref(lsm);
248         r0->lo_nr = lsm->lsm_stripe_count;
249         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
250
251         lov->lo_layout_invalid = true;
252
253         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
254         if (r0->lo_sub != NULL) {
255                 int psz = 0;
256
257                 result = 0;
258                 subconf->coc_inode = conf->coc_inode;
259                 spin_lock_init(&r0->lo_sub_lock);
260                 /*
261                  * Create stripe cl_objects.
262                  */
263                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
264                         struct cl_device *subdev;
265                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
266                         int ost_idx = oinfo->loi_ost_idx;
267
268                         if (lov_oinfo_is_dummy(oinfo))
269                                 continue;
270
271                         result = ostid_to_fid(ofid, &oinfo->loi_oi,
272                                               oinfo->loi_ost_idx);
273                         if (result != 0)
274                                 GOTO(out, result);
275
276                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
277                         subconf->u.coc_oinfo = oinfo;
278                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
279                         /* In the function below, .hs_keycmp resolves to
280                          * lu_obj_hop_keycmp() */
281                         /* coverity[overrun-buffer-val] */
282                         stripe = lov_sub_find(env, subdev, ofid, subconf);
283                         if (!IS_ERR(stripe)) {
284                                 result = lov_init_sub(env, lov, stripe, r0, i);
285                                 if (result == -EAGAIN) { /* try again */
286                                         --i;
287                                         result = 0;
288                                         continue;
289                                 }
290                         } else {
291                                 result = PTR_ERR(stripe);
292                         }
293
294                         if (result == 0) {
295                                 int sz = lov_page_slice_fixup(lov, stripe);
296                                 LASSERT(ergo(psz > 0, psz == sz));
297                                 psz = sz;
298                         }
299                 }
300                 if (result == 0)
301                         cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
302         } else
303                 result = -ENOMEM;
304 out:
305         RETURN(result);
306 }
307
308 static int lov_init_released(const struct lu_env *env,
309                              struct lov_device *dev, struct lov_object *lov,
310                              struct lov_stripe_md *lsm,
311                              const struct cl_object_conf *conf,
312                              union lov_layout_state *state)
313 {
314         LASSERT(lsm != NULL);
315         LASSERT(lsm_is_released(lsm));
316         LASSERT(lov->lo_lsm == NULL);
317
318         lov->lo_lsm = lsm_addref(lsm);
319         return 0;
320 }
321
322 static struct cl_object *lov_find_subobj(const struct lu_env *env,
323                                          struct lov_object *lov,
324                                          struct lov_stripe_md *lsm,
325                                          int stripe_idx)
326 {
327         struct lov_device       *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
328         struct lov_oinfo        *oinfo = lsm->lsm_oinfo[stripe_idx];
329         struct lov_thread_info  *lti = lov_env_info(env);
330         struct lu_fid           *ofid = &lti->lti_fid;
331         struct cl_device        *subdev;
332         int                     ost_idx;
333         int                     rc;
334         struct cl_object        *result;
335
336         if (lov->lo_type != LLT_RAID0)
337                 GOTO(out, result = NULL);
338
339         ost_idx = oinfo->loi_ost_idx;
340         rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
341         if (rc != 0)
342                 GOTO(out, result = NULL);
343
344         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
345         result = lov_sub_find(env, subdev, ofid, NULL);
346 out:
347         if (result == NULL)
348                 result = ERR_PTR(-EINVAL);
349         return result;
350 }
351
352 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
353                             union lov_layout_state *state)
354 {
355         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
356
357         lov_layout_wait(env, lov);
358         return 0;
359 }
360
361 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
362                                struct lovsub_object *los, int idx)
363 {
364         struct cl_object        *sub;
365         struct lov_layout_raid0 *r0;
366         struct lu_site          *site;
367         struct lu_site_bkt_data *bkt;
368         wait_queue_t          *waiter;
369
370         r0  = &lov->u.raid0;
371         LASSERT(r0->lo_sub[idx] == los);
372
373         sub  = lovsub2cl(los);
374         site = sub->co_lu.lo_dev->ld_site;
375         bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
376
377         cl_object_kill(env, sub);
378         /* release a reference to the sub-object and ... */
379         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
380         cl_object_put(env, sub);
381
382         /* ... wait until it is actually destroyed---sub-object clears its
383          * ->lo_sub[] slot in lovsub_object_fini() */
384         if (r0->lo_sub[idx] == los) {
385                 waiter = &lov_env_info(env)->lti_waiter;
386                 init_waitqueue_entry(waiter, current);
387                 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
388                 set_current_state(TASK_UNINTERRUPTIBLE);
389                 while (1) {
390                         /* this wait-queue is signaled at the end of
391                          * lu_object_free(). */
392                         set_current_state(TASK_UNINTERRUPTIBLE);
393                         spin_lock(&r0->lo_sub_lock);
394                         if (r0->lo_sub[idx] == los) {
395                                 spin_unlock(&r0->lo_sub_lock);
396                                 schedule();
397                         } else {
398                                 spin_unlock(&r0->lo_sub_lock);
399                                 set_current_state(TASK_RUNNING);
400                                 break;
401                         }
402                 }
403                 remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
404         }
405         LASSERT(r0->lo_sub[idx] == NULL);
406 }
407
408 static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
409                             union lov_layout_state *state)
410 {
411         struct lov_layout_raid0 *r0 = &state->raid0;
412         struct lov_stripe_md    *lsm = lov->lo_lsm;
413         int i;
414
415         ENTRY;
416
417         dump_lsm(D_INODE, lsm);
418
419         lov_layout_wait(env, lov);
420         if (r0->lo_sub != NULL) {
421                 for (i = 0; i < r0->lo_nr; ++i) {
422                         struct lovsub_object *los = r0->lo_sub[i];
423
424                         if (los != NULL) {
425                                 cl_object_prune(env, &los->lso_cl);
426                                 /*
427                                  * If top-level object is to be evicted from
428                                  * the cache, so are its sub-objects.
429                                  */
430                                 lov_subobject_kill(env, lov, los, i);
431                         }
432                 }
433         }
434         RETURN(0);
435 }
436
437 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
438                            union lov_layout_state *state)
439 {
440         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
441 }
442
443 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
444                            union lov_layout_state *state)
445 {
446         struct lov_layout_raid0 *r0 = &state->raid0;
447         ENTRY;
448
449         if (r0->lo_sub != NULL) {
450                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
451                 r0->lo_sub = NULL;
452         }
453
454         dump_lsm(D_INODE, lov->lo_lsm);
455         lov_free_memmd(&lov->lo_lsm);
456
457         EXIT;
458 }
459
460 static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
461                                 union lov_layout_state *state)
462 {
463         ENTRY;
464         dump_lsm(D_INODE, lov->lo_lsm);
465         lov_free_memmd(&lov->lo_lsm);
466         EXIT;
467 }
468
469 static int lov_print_empty(const struct lu_env *env, void *cookie,
470                            lu_printer_t p, const struct lu_object *o)
471 {
472         (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
473         return 0;
474 }
475
476 static int lov_print_raid0(const struct lu_env *env, void *cookie,
477                            lu_printer_t p, const struct lu_object *o)
478 {
479         struct lov_object       *lov = lu2lov(o);
480         struct lov_layout_raid0 *r0  = lov_r0(lov);
481         struct lov_stripe_md    *lsm = lov->lo_lsm;
482         int                      i;
483
484         (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
485                 r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
486                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
487                 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
488         for (i = 0; i < r0->lo_nr; ++i) {
489                 struct lu_object *sub;
490
491                 if (r0->lo_sub[i] != NULL) {
492                         sub = lovsub2lu(r0->lo_sub[i]);
493                         lu_object_print(env, cookie, p, sub);
494                 } else {
495                         (*p)(env, cookie, "sub %d absent\n", i);
496                 }
497         }
498         return 0;
499 }
500
501 static int lov_print_released(const struct lu_env *env, void *cookie,
502                                 lu_printer_t p, const struct lu_object *o)
503 {
504         struct lov_object       *lov = lu2lov(o);
505         struct lov_stripe_md    *lsm = lov->lo_lsm;
506
507         (*p)(env, cookie,
508                 "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
509                 lov->lo_layout_invalid ? "invalid" : "valid", lsm,
510                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
511                 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
512         return 0;
513 }
514
515 /**
516  * Implements cl_object_operations::coo_attr_get() method for an object
517  * without stripes (LLT_EMPTY layout type).
518  *
519  * The only attributes this layer is authoritative in this case is
520  * cl_attr::cat_blocks---it's 0.
521  */
522 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
523                               struct cl_attr *attr)
524 {
525         attr->cat_blocks = 0;
526         return 0;
527 }
528
529 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
530                               struct cl_attr *attr)
531 {
532         struct lov_object       *lov = cl2lov(obj);
533         struct lov_layout_raid0 *r0 = lov_r0(lov);
534         struct cl_attr          *lov_attr = &r0->lo_attr;
535         int                      result = 0;
536
537         ENTRY;
538
539         /* this is called w/o holding type guard mutex, so it must be inside
540          * an on going IO otherwise lsm may be replaced.
541          * LU-2117: it turns out there exists one exception. For mmaped files,
542          * the lock of those files may be requested in the other file's IO
543          * context, and this function is called in ccc_lock_state(), it will
544          * hit this assertion.
545          * Anyway, it's still okay to call attr_get w/o type guard as layout
546          * can't go if locks exist. */
547         /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
548
549         if (!r0->lo_attr_valid) {
550                 struct lov_stripe_md    *lsm = lov->lo_lsm;
551                 struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
552                 __u64                    kms = 0;
553
554                 memset(lvb, 0, sizeof(*lvb));
555                 /* XXX: timestamps can be negative by sanity:test_39m,
556                  * how can it be? */
557                 lvb->lvb_atime = LLONG_MIN;
558                 lvb->lvb_ctime = LLONG_MIN;
559                 lvb->lvb_mtime = LLONG_MIN;
560
561                 /*
562                  * XXX that should be replaced with a loop over sub-objects,
563                  * doing cl_object_attr_get() on them. But for now, let's
564                  * reuse old lov code.
565                  */
566
567                 /*
568                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
569                  * happy. It's not needed, because new code uses
570                  * ->coh_attr_guard spin-lock to protect consistency of
571                  * sub-object attributes.
572                  */
573                 lov_stripe_lock(lsm);
574                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
575                 lov_stripe_unlock(lsm);
576                 if (result == 0) {
577                         cl_lvb2attr(lov_attr, lvb);
578                         lov_attr->cat_kms = kms;
579                         r0->lo_attr_valid = 1;
580                 }
581         }
582         if (result == 0) { /* merge results */
583                 attr->cat_blocks = lov_attr->cat_blocks;
584                 attr->cat_size = lov_attr->cat_size;
585                 attr->cat_kms = lov_attr->cat_kms;
586                 if (attr->cat_atime < lov_attr->cat_atime)
587                         attr->cat_atime = lov_attr->cat_atime;
588                 if (attr->cat_ctime < lov_attr->cat_ctime)
589                         attr->cat_ctime = lov_attr->cat_ctime;
590                 if (attr->cat_mtime < lov_attr->cat_mtime)
591                         attr->cat_mtime = lov_attr->cat_mtime;
592         }
593         RETURN(result);
594 }
595
596 static int lov_find_cbdata_empty(const struct lu_env *env,
597                                  struct cl_object *obj, ldlm_iterator_t iter,
598                                  void *data)
599 {
600         return 0;
601 }
602
603 static int lov_find_cbdata_raid0(const struct lu_env *env,
604                                  struct cl_object *obj, ldlm_iterator_t iter,
605                                  void *data)
606 {
607         struct lov_object       *lov = cl2lov(obj);
608         struct lov_layout_raid0 *r0 = lov_r0(lov);
609         struct cl_object        *subobj;
610         int                     i;
611         int                     rc = 0;
612
613         for (i = 0; i < r0->lo_nr; ++i) {
614                 if (r0->lo_sub[i] == NULL)
615                         continue;
616
617                 subobj = lovsub2cl(r0->lo_sub[i]);
618
619                 rc = cl_object_find_cbdata(env, subobj, iter, data);
620                 if (rc != 0)
621                         break;
622         }
623
624         return rc;
625 }
626
627 const static struct lov_layout_operations lov_dispatch[] = {
628         [LLT_EMPTY] = {
629                 .llo_init      = lov_init_empty,
630                 .llo_delete    = lov_delete_empty,
631                 .llo_fini      = lov_fini_empty,
632                 .llo_install   = lov_install_empty,
633                 .llo_print     = lov_print_empty,
634                 .llo_page_init = lov_page_init_empty,
635                 .llo_lock_init = lov_lock_init_empty,
636                 .llo_io_init   = lov_io_init_empty,
637                 .llo_getattr   = lov_attr_get_empty,
638                 .llo_find_cbdata = lov_find_cbdata_empty
639         },
640         [LLT_RAID0] = {
641                 .llo_init      = lov_init_raid0,
642                 .llo_delete    = lov_delete_raid0,
643                 .llo_fini      = lov_fini_raid0,
644                 .llo_install   = lov_install_raid0,
645                 .llo_print     = lov_print_raid0,
646                 .llo_page_init = lov_page_init_raid0,
647                 .llo_lock_init = lov_lock_init_raid0,
648                 .llo_io_init   = lov_io_init_raid0,
649                 .llo_getattr   = lov_attr_get_raid0,
650                 .llo_find_cbdata = lov_find_cbdata_raid0
651         },
652         [LLT_RELEASED] = {
653                 .llo_init      = lov_init_released,
654                 .llo_delete    = lov_delete_empty,
655                 .llo_fini      = lov_fini_released,
656                 .llo_install   = lov_install_empty,
657                 .llo_print     = lov_print_released,
658                 .llo_page_init = lov_page_init_empty,
659                 .llo_lock_init = lov_lock_init_empty,
660                 .llo_io_init   = lov_io_init_released,
661                 .llo_getattr   = lov_attr_get_empty,
662                 .llo_find_cbdata = lov_find_cbdata_empty
663         }
664 };
665
666 /**
667  * Performs a double-dispatch based on the layout type of an object.
668  */
669 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)                              \
670 ({                                                                      \
671         struct lov_object                      *__obj = (obj);          \
672         enum lov_layout_type                    __llt;                  \
673                                                                         \
674         __llt = __obj->lo_type;                                         \
675         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
676         lov_dispatch[__llt].op(__VA_ARGS__);                            \
677 })
678
679 /**
680  * Return lov_layout_type associated with a given lsm
681  */
682 static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
683 {
684         if (lsm == NULL)
685                 return LLT_EMPTY;
686         if (lsm_is_released(lsm))
687                 return LLT_RELEASED;
688         return LLT_RAID0;
689 }
690
691 static inline void lov_conf_freeze(struct lov_object *lov)
692 {
693         if (lov->lo_owner != current)
694                 down_read(&lov->lo_type_guard);
695 }
696
697 static inline void lov_conf_thaw(struct lov_object *lov)
698 {
699         if (lov->lo_owner != current)
700                 up_read(&lov->lo_type_guard);
701 }
702
703 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
704 ({                                                                      \
705         struct lov_object                      *__obj = (obj);          \
706         int                                     __lock = !!(lock);      \
707         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
708                                                                         \
709         if (__lock)                                                     \
710                 lov_conf_freeze(__obj);                                 \
711         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
712         if (__lock)                                                     \
713                 lov_conf_thaw(__obj);                                   \
714         __result;                                                       \
715 })
716
717 /**
718  * Performs a locked double-dispatch based on the layout type of an object.
719  */
720 #define LOV_2DISPATCH(obj, op, ...)                     \
721         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
722
723 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
724 do {                                                                    \
725         struct lov_object                      *__obj = (obj);          \
726         enum lov_layout_type                    __llt;                  \
727                                                                         \
728         lov_conf_freeze(__obj);                                         \
729         __llt = __obj->lo_type;                                         \
730         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
731         lov_dispatch[__llt].op(__VA_ARGS__);                            \
732         lov_conf_thaw(__obj);                                           \
733 } while (0)
734
735 static void lov_conf_lock(struct lov_object *lov)
736 {
737         LASSERT(lov->lo_owner != current);
738         down_write(&lov->lo_type_guard);
739         LASSERT(lov->lo_owner == NULL);
740         lov->lo_owner = current;
741 }
742
743 static void lov_conf_unlock(struct lov_object *lov)
744 {
745         lov->lo_owner = NULL;
746         up_write(&lov->lo_type_guard);
747 }
748
749 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
750 {
751         struct l_wait_info lwi = { 0 };
752         ENTRY;
753
754         while (atomic_read(&lov->lo_active_ios) > 0) {
755                 CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
756                         PFID(lu_object_fid(lov2lu(lov))),
757                         atomic_read(&lov->lo_active_ios));
758
759                 l_wait_event(lov->lo_waitq,
760                              atomic_read(&lov->lo_active_ios) == 0, &lwi);
761         }
762         RETURN(0);
763 }
764
765 static int lov_layout_change(const struct lu_env *unused,
766                              struct lov_object *lov, struct lov_stripe_md *lsm,
767                              const struct cl_object_conf *conf)
768 {
769         enum lov_layout_type llt = lov_type(lsm);
770         union lov_layout_state *state = &lov->u;
771         const struct lov_layout_operations *old_ops;
772         const struct lov_layout_operations *new_ops;
773         void *cookie;
774         struct lu_env *env;
775         __u16 refcheck;
776         int rc;
777         ENTRY;
778
779         LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
780
781         cookie = cl_env_reenter();
782         env = cl_env_get(&refcheck);
783         if (IS_ERR(env)) {
784                 cl_env_reexit(cookie);
785                 RETURN(PTR_ERR(env));
786         }
787
788         LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
789
790         CDEBUG(D_INODE, DFID" from %s to %s\n",
791                PFID(lu_object_fid(lov2lu(lov))),
792                llt2str(lov->lo_type), llt2str(llt));
793
794         old_ops = &lov_dispatch[lov->lo_type];
795         new_ops = &lov_dispatch[llt];
796
797         rc = cl_object_prune(env, &lov->lo_cl);
798         if (rc != 0)
799                 GOTO(out, rc);
800
801         rc = old_ops->llo_delete(env, lov, &lov->u);
802         if (rc != 0)
803                 GOTO(out, rc);
804
805         old_ops->llo_fini(env, lov, &lov->u);
806
807         LASSERT(atomic_read(&lov->lo_active_ios) == 0);
808
809         lov->lo_type = LLT_EMPTY;
810
811         /* page bufsize fixup */
812         cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
813                 lov_page_slice_fixup(lov, NULL);
814
815         rc = new_ops->llo_init(env, lov_object_dev(lov), lov, lsm, conf, state);
816         if (rc != 0) {
817                 new_ops->llo_delete(env, lov, state);
818                 new_ops->llo_fini(env, lov, state);
819                 /* this file becomes an EMPTY file. */
820                 GOTO(out, rc);
821         }
822
823         new_ops->llo_install(env, lov, state);
824         lov->lo_type = llt;
825
826 out:
827         cl_env_put(env, &refcheck);
828         cl_env_reexit(cookie);
829
830         RETURN(rc);
831 }
832
833 /*****************************************************************************
834  *
835  * Lov object operations.
836  *
837  */
838 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
839                     const struct lu_object_conf *conf)
840 {
841         struct lov_object            *lov   = lu2lov(obj);
842         struct lov_device            *dev   = lov_object_dev(lov);
843         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
844         union lov_layout_state       *set   = &lov->u;
845         const struct lov_layout_operations *ops;
846         struct lov_stripe_md *lsm = NULL;
847         int rc;
848         ENTRY;
849
850         init_rwsem(&lov->lo_type_guard);
851         atomic_set(&lov->lo_active_ios, 0);
852         init_waitqueue_head(&lov->lo_waitq);
853         cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
854
855         lov->lo_type = LLT_EMPTY;
856         if (cconf->u.coc_layout.lb_buf != NULL) {
857                 lsm = lov_unpackmd(dev->ld_lov,
858                                    cconf->u.coc_layout.lb_buf,
859                                    cconf->u.coc_layout.lb_len);
860                 if (IS_ERR(lsm))
861                         RETURN(PTR_ERR(lsm));
862         }
863
864         /* no locking is necessary, as object is being created */
865         lov->lo_type = lov_type(lsm);
866         ops = &lov_dispatch[lov->lo_type];
867         rc = ops->llo_init(env, dev, lov, lsm, cconf, set);
868         if (rc != 0)
869                 GOTO(out_lsm, rc);
870
871         ops->llo_install(env, lov, set);
872
873 out_lsm:
874         lov_lsm_put(lsm);
875
876         RETURN(rc);
877 }
878
879 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
880                         const struct cl_object_conf *conf)
881 {
882         struct lov_stripe_md    *lsm = NULL;
883         struct lov_object       *lov = cl2lov(obj);
884         int                      result = 0;
885         ENTRY;
886
887         if (conf->coc_opc == OBJECT_CONF_SET &&
888             conf->u.coc_layout.lb_buf != NULL) {
889                 lsm = lov_unpackmd(lov_object_dev(lov)->ld_lov,
890                                    conf->u.coc_layout.lb_buf,
891                                    conf->u.coc_layout.lb_len);
892                 if (IS_ERR(lsm))
893                         RETURN(PTR_ERR(lsm));
894         }
895
896         lov_conf_lock(lov);
897         if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
898                 lov->lo_layout_invalid = true;
899                 GOTO(out, result = 0);
900         }
901
902         if (conf->coc_opc == OBJECT_CONF_WAIT) {
903                 if (lov->lo_layout_invalid &&
904                     atomic_read(&lov->lo_active_ios) > 0) {
905                         lov_conf_unlock(lov);
906                         result = lov_layout_wait(env, lov);
907                         lov_conf_lock(lov);
908                 }
909                 GOTO(out, result);
910         }
911
912         LASSERT(conf->coc_opc == OBJECT_CONF_SET);
913
914         if ((lsm == NULL && lov->lo_lsm == NULL) ||
915             ((lsm != NULL && lov->lo_lsm != NULL) &&
916              (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
917              (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
918                 /* same version of layout */
919                 lov->lo_layout_invalid = false;
920                 GOTO(out, result = 0);
921         }
922
923         /* will change layout - check if there still exists active IO. */
924         if (atomic_read(&lov->lo_active_ios) > 0) {
925                 lov->lo_layout_invalid = true;
926                 GOTO(out, result = -EBUSY);
927         }
928
929         result = lov_layout_change(env, lov, lsm, conf);
930         lov->lo_layout_invalid = result != 0;
931         EXIT;
932
933 out:
934         lov_conf_unlock(lov);
935         lov_lsm_put(lsm);
936         CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
937                PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
938         RETURN(result);
939 }
940
941 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
942 {
943         struct lov_object *lov = lu2lov(obj);
944
945         ENTRY;
946         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
947         EXIT;
948 }
949
950 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
951 {
952         struct lov_object *lov = lu2lov(obj);
953
954         ENTRY;
955         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
956         lu_object_fini(obj);
957         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
958         EXIT;
959 }
960
961 static int lov_object_print(const struct lu_env *env, void *cookie,
962                             lu_printer_t p, const struct lu_object *o)
963 {
964         return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
965 }
966
967 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
968                   struct cl_page *page, pgoff_t index)
969 {
970         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
971                                     index);
972 }
973
974 /**
975  * Implements cl_object_operations::clo_io_init() method for lov
976  * layer. Dispatches to the appropriate layout io initialization method.
977  */
978 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
979                 struct cl_io *io)
980 {
981         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
982         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
983                                      !io->ci_ignore_layout, env, obj, io);
984 }
985
986 /**
987  * An implementation of cl_object_operations::clo_attr_get() method for lov
988  * layer. For raid0 layout this collects and merges attributes of all
989  * sub-objects.
990  */
991 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
992                         struct cl_attr *attr)
993 {
994         /* do not take lock, as this function is called under a
995          * spin-lock. Layout is protected from changing by ongoing IO. */
996         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
997 }
998
999 static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
1000                            const struct cl_attr *attr, unsigned valid)
1001 {
1002         /*
1003          * No dispatch is required here, as no layout implements this.
1004          */
1005         return 0;
1006 }
1007
1008 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
1009                   struct cl_lock *lock, const struct cl_io *io)
1010 {
1011         /* No need to lock because we've taken one refcount of layout.  */
1012         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock,
1013                                     io);
1014 }
1015
1016 /**
1017  * We calculate on which OST the mapping will end. If the length of mapping
1018  * is greater than (stripe_size * stripe_count) then the last_stripe will
1019  * will be one just before start_stripe. Else we check if the mapping
1020  * intersects each OST and find last_stripe.
1021  * This function returns the last_stripe and also sets the stripe_count
1022  * over which the mapping is spread
1023  *
1024  * \param lsm [in]              striping information for the file
1025  * \param fm_start [in]         logical start of mapping
1026  * \param fm_end [in]           logical end of mapping
1027  * \param start_stripe [in]     starting stripe of the mapping
1028  * \param stripe_count [out]    the number of stripes across which to map is
1029  *                              returned
1030  *
1031  * \retval last_stripe          return the last stripe of the mapping
1032  */
1033 static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm,
1034                                    loff_t fm_start, loff_t fm_end,
1035                                    int start_stripe, int *stripe_count)
1036 {
1037         int last_stripe;
1038         loff_t obd_start;
1039         loff_t obd_end;
1040         int i, j;
1041
1042         if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
1043                 last_stripe = (start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
1044                                                               start_stripe - 1);
1045                 *stripe_count = lsm->lsm_stripe_count;
1046         } else {
1047                 for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
1048                      i = (i + 1) % lsm->lsm_stripe_count, j++) {
1049                         if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
1050                                                    &obd_start, &obd_end)) == 0)
1051                                 break;
1052                 }
1053                 *stripe_count = j;
1054                 last_stripe = (start_stripe + j - 1) % lsm->lsm_stripe_count;
1055         }
1056
1057         return last_stripe;
1058 }
1059
1060 /**
1061  * Set fe_device and copy extents from local buffer into main return buffer.
1062  *
1063  * \param fiemap [out]          fiemap to hold all extents
1064  * \param lcl_fm_ext [in]       array of fiemap extents get from OSC layer
1065  * \param ost_index [in]        OST index to be written into the fm_device
1066  *                              field for each extent
1067  * \param ext_count [in]        number of extents to be copied
1068  * \param current_extent [in]   where to start copying in the extent array
1069  */
1070 static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
1071                                          struct fiemap_extent *lcl_fm_ext,
1072                                          int ost_index, unsigned int ext_count,
1073                                          int current_extent)
1074 {
1075         char            *to;
1076         unsigned int    ext;
1077
1078         for (ext = 0; ext < ext_count; ext++) {
1079                 lcl_fm_ext[ext].fe_device = ost_index;
1080                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1081         }
1082
1083         /* Copy fm_extent's from fm_local to return buffer */
1084         to = (char *)fiemap + fiemap_count_to_size(current_extent);
1085         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct fiemap_extent));
1086 }
1087
1088 #define FIEMAP_BUFFER_SIZE 4096
1089
1090 /**
1091  * Non-zero fe_logical indicates that this is a continuation FIEMAP
1092  * call. The local end offset and the device are sent in the first
1093  * fm_extent. This function calculates the stripe number from the index.
1094  * This function returns a stripe_no on which mapping is to be restarted.
1095  *
1096  * This function returns fm_end_offset which is the in-OST offset at which
1097  * mapping should be restarted. If fm_end_offset=0 is returned then caller
1098  * will re-calculate proper offset in next stripe.
1099  * Note that the first extent is passed to lov_get_info via the value field.
1100  *
1101  * \param fiemap [in]           fiemap request header
1102  * \param lsm [in]              striping information for the file
1103  * \param fm_start [in]         logical start of mapping
1104  * \param fm_end [in]           logical end of mapping
1105  * \param start_stripe [out]    starting stripe will be returned in this
1106  */
1107 static loff_t fiemap_calc_fm_end_offset(struct fiemap *fiemap,
1108                                         struct lov_stripe_md *lsm,
1109                                         loff_t fm_start, loff_t fm_end,
1110                                         int *start_stripe)
1111 {
1112         loff_t local_end = fiemap->fm_extents[0].fe_logical;
1113         loff_t lun_start;
1114         loff_t lun_end;
1115         loff_t fm_end_offset;
1116         int stripe_no = -1;
1117         int i;
1118
1119         if (fiemap->fm_extent_count == 0 ||
1120             fiemap->fm_extents[0].fe_logical == 0)
1121                 return 0;
1122
1123         /* Find out stripe_no from ost_index saved in the fe_device */
1124         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1125                 struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
1126
1127                 if (lov_oinfo_is_dummy(oinfo))
1128                         continue;
1129
1130                 if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1131                         stripe_no = i;
1132                         break;
1133                 }
1134         }
1135
1136         if (stripe_no == -1)
1137                 return -EINVAL;
1138
1139         /* If we have finished mapping on previous device, shift logical
1140          * offset to start of next device */
1141         if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
1142                                   &lun_start, &lun_end) != 0 &&
1143             local_end < lun_end) {
1144                 fm_end_offset = local_end;
1145                 *start_stripe = stripe_no;
1146         } else {
1147                 /* This is a special value to indicate that caller should
1148                  * calculate offset in next stripe. */
1149                 fm_end_offset = 0;
1150                 *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
1151         }
1152
1153         return fm_end_offset;
1154 }
1155
1156 /**
1157  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1158  * This also handles the restarting of FIEMAP calls in case mapping overflows
1159  * the available number of extents in single call.
1160  *
1161  * \param env [in]              lustre environment
1162  * \param obj [in]              file object
1163  * \param fmkey [in]            fiemap request header and other info
1164  * \param fiemap [out]          fiemap buffer holding retrived map extents
1165  * \param buflen [in/out]       max buffer length of @fiemap, when iterate
1166  *                              each OST, it is used to limit max map needed
1167  * \retval 0    success
1168  * \retval < 0  error
1169  */
1170 static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
1171                              struct ll_fiemap_info_key *fmkey,
1172                              struct fiemap *fiemap, size_t *buflen)
1173 {
1174         struct lov_stripe_md    *lsm;
1175         struct cl_object        *subobj = NULL;
1176         struct lov_obd          *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
1177         struct fiemap           *fm_local = NULL;
1178         struct fiemap_extent    *lcl_fm_ext;
1179         loff_t                  fm_start;
1180         loff_t                  fm_end;
1181         loff_t                  fm_length;
1182         loff_t                  fm_end_offset;
1183         int                     count_local;
1184         int                     ost_index = 0;
1185         int                     start_stripe;
1186         int                     current_extent = 0;
1187         int                     rc = 0;
1188         int                     last_stripe;
1189         int                     cur_stripe = 0;
1190         int                     cur_stripe_wrap = 0;
1191         int                     stripe_count;
1192         unsigned int            buffer_size = FIEMAP_BUFFER_SIZE;
1193         /* Whether have we collected enough extents */
1194         bool                    enough = false;
1195         /* EOF for object */
1196         bool                    ost_eof = false;
1197         /* done with required mapping for this OST? */
1198         bool                    ost_done = false;
1199         ENTRY;
1200
1201         lsm = lov_lsm_addref(cl2lov(obj));
1202         if (lsm == NULL)
1203                 RETURN(-ENODATA);
1204
1205         /**
1206          * If the stripe_count > 1 and the application does not understand
1207          * DEVICE_ORDER flag, it cannot interpret the extents correctly.
1208          */
1209         if (lsm->lsm_stripe_count > 1 && !(fiemap->fm_flags &
1210                                            FIEMAP_FLAG_DEVICE_ORDER))
1211                 GOTO(out, rc = -ENOTSUPP);
1212
1213         if (lsm_is_released(lsm)) {
1214                 if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
1215                         /**
1216                          * released file, return a minimal FIEMAP if
1217                          * request fits in file-size.
1218                          */
1219                         fiemap->fm_mapped_extents = 1;
1220                         fiemap->fm_extents[0].fe_logical = fiemap->fm_start;
1221                         if (fiemap->fm_start + fiemap->fm_length <
1222                             fmkey->lfik_oa.o_size)
1223                                 fiemap->fm_extents[0].fe_length =
1224                                         fiemap->fm_length;
1225                         else
1226                                 fiemap->fm_extents[0].fe_length =
1227                                         fmkey->lfik_oa.o_size -
1228                                         fiemap->fm_start;
1229                         fiemap->fm_extents[0].fe_flags |=
1230                                 FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST;
1231                 }
1232                 GOTO(out, rc = 0);
1233         }
1234
1235         if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
1236                 buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
1237
1238         OBD_ALLOC_LARGE(fm_local, buffer_size);
1239         if (fm_local == NULL)
1240                 GOTO(out, rc = -ENOMEM);
1241         lcl_fm_ext = &fm_local->fm_extents[0];
1242         count_local = fiemap_size_to_count(buffer_size);
1243
1244         fm_start = fiemap->fm_start;
1245         fm_length = fiemap->fm_length;
1246         /* Calculate start stripe, last stripe and length of mapping */
1247         start_stripe = lov_stripe_number(lsm, fm_start);
1248         fm_end = (fm_length == ~0ULL) ? fmkey->lfik_oa.o_size :
1249                                         fm_start + fm_length - 1;
1250         /* If fm_length != ~0ULL but fm_start_fm_length-1 exceeds file size */
1251         if (fm_end > fmkey->lfik_oa.o_size)
1252                 fm_end = fmkey->lfik_oa.o_size;
1253
1254         last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
1255                                               start_stripe, &stripe_count);
1256         fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start, fm_end,
1257                                                   &start_stripe);
1258         if (fm_end_offset == -EINVAL)
1259                 GOTO(out, rc = -EINVAL);
1260
1261         /**
1262          * Requested extent count exceeds the fiemap buffer size, shrink our
1263          * ambition.
1264          */
1265         if (fiemap_count_to_size(fiemap->fm_extent_count) > *buflen)
1266                 fiemap->fm_extent_count = fiemap_size_to_count(*buflen);
1267         if (fiemap->fm_extent_count == 0)
1268                 count_local = 0;
1269
1270         /* Check each stripe */
1271         for (cur_stripe = start_stripe; stripe_count > 0;
1272              --stripe_count,
1273              cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
1274                 loff_t req_fm_len; /* Stores length of required mapping */
1275                 loff_t len_mapped_single_call;
1276                 loff_t lun_start;
1277                 loff_t lun_end;
1278                 loff_t obd_object_end;
1279                 unsigned int ext_count;
1280
1281                 cur_stripe_wrap = cur_stripe;
1282
1283                 /* Find out range of mapping on this stripe */
1284                 if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
1285                                            &lun_start, &obd_object_end)) == 0)
1286                         continue;
1287
1288                 if (lov_oinfo_is_dummy(lsm->lsm_oinfo[cur_stripe]))
1289                         GOTO(out, rc = -EIO);
1290
1291                 /* If this is a continuation FIEMAP call and we are on
1292                  * starting stripe then lun_start needs to be set to
1293                  * fm_end_offset */
1294                 if (fm_end_offset != 0 && cur_stripe == start_stripe)
1295                         lun_start = fm_end_offset;
1296
1297                 if (fm_length != ~0ULL) {
1298                         /* Handle fm_start + fm_length overflow */
1299                         if (fm_start + fm_length < fm_start)
1300                                 fm_length = ~0ULL - fm_start;
1301                         lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
1302                                                      cur_stripe);
1303                 } else {
1304                         lun_end = ~0ULL;
1305                 }
1306
1307                 if (lun_start == lun_end)
1308                         continue;
1309
1310                 req_fm_len = obd_object_end - lun_start;
1311                 fm_local->fm_length = 0;
1312                 len_mapped_single_call = 0;
1313
1314                 /* find lobsub object */
1315                 subobj = lov_find_subobj(env, cl2lov(obj), lsm,
1316                                              cur_stripe);
1317                 if (IS_ERR(subobj))
1318                         GOTO(out, rc = PTR_ERR(subobj));
1319                 /* If the output buffer is very large and the objects have many
1320                  * extents we may need to loop on a single OST repeatedly */
1321                 ost_eof = false;
1322                 ost_done = false;
1323                 do {
1324                         if (fiemap->fm_extent_count > 0) {
1325                                 /* Don't get too many extents. */
1326                                 if (current_extent + count_local >
1327                                     fiemap->fm_extent_count)
1328                                         count_local = fiemap->fm_extent_count -
1329                                                       current_extent;
1330                         }
1331
1332                         lun_start += len_mapped_single_call;
1333                         fm_local->fm_length = req_fm_len -
1334                                               len_mapped_single_call;
1335                         req_fm_len = fm_local->fm_length;
1336                         fm_local->fm_extent_count = enough ? 1 : count_local;
1337                         fm_local->fm_mapped_extents = 0;
1338                         fm_local->fm_flags = fiemap->fm_flags;
1339
1340                         ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
1341
1342                         if (ost_index < 0 ||
1343                             ost_index >= lov->desc.ld_tgt_count)
1344                                 GOTO(obj_put, rc = -EINVAL);
1345                         /* If OST is inactive, return extent with UNKNOWN
1346                          * flag. */
1347                         if (!lov->lov_tgts[ost_index]->ltd_active) {
1348                                 fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
1349                                 fm_local->fm_mapped_extents = 1;
1350
1351                                 lcl_fm_ext[0].fe_logical = lun_start;
1352                                 lcl_fm_ext[0].fe_length = obd_object_end -
1353                                                           lun_start;
1354                                 lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1355
1356                                 goto inactive_tgt;
1357                         }
1358
1359                         fm_local->fm_start = lun_start;
1360                         fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1361                         memcpy(&fmkey->lfik_fiemap, fm_local,
1362                                sizeof(*fm_local));
1363                         *buflen = fiemap_count_to_size(
1364                                                 fm_local->fm_extent_count);
1365
1366                         rc = cl_object_fiemap(env, subobj, fmkey, fm_local,
1367                                               buflen);
1368                         if (rc != 0)
1369                                 GOTO(obj_put, rc);
1370 inactive_tgt:
1371                         ext_count = fm_local->fm_mapped_extents;
1372                         if (ext_count == 0) {
1373                                 ost_done = true;
1374                                 /* If last stripe has hold at the end,
1375                                  * we need to return */
1376                                 if (cur_stripe_wrap == last_stripe) {
1377                                         fiemap->fm_mapped_extents = 0;
1378                                         goto finish;
1379                                 }
1380                                 break;
1381                         } else if (enough) {
1382                                 /*
1383                                  * We've collected enough extents and there are
1384                                  * more extents after it.
1385                                  */
1386                                 goto finish;
1387                         }
1388
1389                         /* If we just need num of extents, got to next device */
1390                         if (fiemap->fm_extent_count == 0) {
1391                                 current_extent += ext_count;
1392                                 break;
1393                         }
1394
1395                         /* prepare to copy retrived map extents */
1396                         len_mapped_single_call =
1397                                 lcl_fm_ext[ext_count - 1].fe_logical -
1398                                 lun_start + lcl_fm_ext[ext_count - 1].fe_length;
1399
1400                         /* Have we finished mapping on this device? */
1401                         if (req_fm_len <= len_mapped_single_call)
1402                                 ost_done = true;
1403
1404                         /* Clear the EXTENT_LAST flag which can be present on
1405                          * the last extent */
1406                         if (lcl_fm_ext[ext_count - 1].fe_flags &
1407                             FIEMAP_EXTENT_LAST)
1408                                 lcl_fm_ext[ext_count - 1].fe_flags &=
1409                                                         ~FIEMAP_EXTENT_LAST;
1410                         if (lov_stripe_size(lsm,
1411                                         lcl_fm_ext[ext_count - 1].fe_logical +
1412                                         lcl_fm_ext[ext_count - 1].fe_length,
1413                                         cur_stripe) >= fmkey->lfik_oa.o_size)
1414                                 ost_eof = true;
1415
1416                         fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
1417                                                      ost_index, ext_count,
1418                                                      current_extent);
1419                         current_extent += ext_count;
1420
1421                         /* Ran out of available extents? */
1422                         if (current_extent >= fiemap->fm_extent_count)
1423                                 enough = true;
1424                 } while (!ost_done && !ost_eof);
1425
1426                 cl_object_put(env, subobj);
1427                 subobj = NULL;
1428
1429                 if (cur_stripe_wrap == last_stripe)
1430                         goto finish;
1431         } /* for each stripe */
1432 finish:
1433         /* Indicate that we are returning device offsets unless file just has
1434          * single stripe */
1435         if (lsm->lsm_stripe_count > 1)
1436                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1437
1438         if (fiemap->fm_extent_count == 0)
1439                 goto skip_last_device_calc;
1440
1441         /* Check if we have reached the last stripe and whether mapping for that
1442          * stripe is done. */
1443         if ((cur_stripe_wrap == last_stripe) && (ost_done || ost_eof))
1444                 fiemap->fm_extents[current_extent - 1].fe_flags |=
1445                                                              FIEMAP_EXTENT_LAST;
1446 skip_last_device_calc:
1447         fiemap->fm_mapped_extents = current_extent;
1448 obj_put:
1449         if (subobj != NULL)
1450                 cl_object_put(env, subobj);
1451 out:
1452         if (fm_local != NULL)
1453                 OBD_FREE_LARGE(fm_local, buffer_size);
1454
1455         lov_lsm_put(lsm);
1456
1457         return rc;
1458 }
1459
1460 static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
1461                                 struct lov_user_md __user *lum)
1462 {
1463         struct lov_object       *lov = cl2lov(obj);
1464         struct lov_stripe_md    *lsm;
1465         int                     rc = 0;
1466         ENTRY;
1467
1468         lsm = lov_lsm_addref(lov);
1469         if (lsm == NULL)
1470                 RETURN(-ENODATA);
1471
1472         rc = lov_getstripe(cl2lov(obj), lsm, lum);
1473         lov_lsm_put(lsm);
1474         RETURN(rc);
1475 }
1476
1477 static int lov_object_layout_get(const struct lu_env *env,
1478                                  struct cl_object *obj,
1479                                  struct cl_layout *cl)
1480 {
1481         struct lov_object *lov = cl2lov(obj);
1482         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1483         struct lu_buf *buf = &cl->cl_buf;
1484         ssize_t rc;
1485         ENTRY;
1486
1487         if (lsm == NULL) {
1488                 cl->cl_size = 0;
1489                 cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
1490
1491                 RETURN(0);
1492         }
1493
1494         cl->cl_size = lov_mds_md_size(lsm->lsm_stripe_count, lsm->lsm_magic);
1495         cl->cl_layout_gen = lsm->lsm_layout_gen;
1496
1497         rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
1498         lov_lsm_put(lsm);
1499
1500         RETURN(rc < 0 ? rc : 0);
1501 }
1502
1503 static loff_t lov_object_maxbytes(struct cl_object *obj)
1504 {
1505         struct lov_object *lov = cl2lov(obj);
1506         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1507         loff_t maxbytes;
1508
1509         if (lsm == NULL)
1510                 return LLONG_MAX;
1511
1512         maxbytes = lsm->lsm_maxbytes;
1513
1514         lov_lsm_put(lsm);
1515
1516         return maxbytes;
1517 }
1518
1519 static int lov_object_find_cbdata(const struct lu_env *env,
1520                                   struct cl_object *obj, ldlm_iterator_t iter,
1521                                   void *data)
1522 {
1523         int rc;
1524         ENTRY;
1525
1526         /* call cl_object_find_cbdata for sub obj */
1527         rc = LOV_2DISPATCH(cl2lov(obj), llo_find_cbdata, env, obj, iter, data);
1528         RETURN(rc);
1529 }
1530
1531 static const struct cl_object_operations lov_ops = {
1532         .coo_page_init    = lov_page_init,
1533         .coo_lock_init    = lov_lock_init,
1534         .coo_io_init      = lov_io_init,
1535         .coo_attr_get     = lov_attr_get,
1536         .coo_attr_update  = lov_attr_update,
1537         .coo_conf_set     = lov_conf_set,
1538         .coo_getstripe    = lov_object_getstripe,
1539         .coo_layout_get   = lov_object_layout_get,
1540         .coo_maxbytes     = lov_object_maxbytes,
1541         .coo_find_cbdata  = lov_object_find_cbdata,
1542         .coo_fiemap       = lov_object_fiemap,
1543 };
1544
1545 static const struct lu_object_operations lov_lu_obj_ops = {
1546         .loo_object_init      = lov_object_init,
1547         .loo_object_delete    = lov_object_delete,
1548         .loo_object_release   = NULL,
1549         .loo_object_free      = lov_object_free,
1550         .loo_object_print     = lov_object_print,
1551         .loo_object_invariant = NULL
1552 };
1553
1554 struct lu_object *lov_object_alloc(const struct lu_env *env,
1555                                    const struct lu_object_header *unused,
1556                                    struct lu_device *dev)
1557 {
1558         struct lov_object *lov;
1559         struct lu_object  *obj;
1560
1561         ENTRY;
1562         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
1563         if (lov != NULL) {
1564                 obj = lov2lu(lov);
1565                 lu_object_init(obj, NULL, dev);
1566                 lov->lo_cl.co_ops = &lov_ops;
1567                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
1568                 /*
1569                  * object io operation vector (cl_object::co_iop) is installed
1570                  * later in lov_object_init(), as different vectors are used
1571                  * for object with different layouts.
1572                  */
1573                 obj->lo_ops = &lov_lu_obj_ops;
1574         } else
1575                 obj = NULL;
1576         RETURN(obj);
1577 }
1578
1579 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
1580 {
1581         struct lov_stripe_md *lsm = NULL;
1582
1583         lov_conf_freeze(lov);
1584         if (lov->lo_lsm != NULL) {
1585                 lsm = lsm_addref(lov->lo_lsm);
1586                 CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
1587                         lsm, atomic_read(&lsm->lsm_refc),
1588                         lov->lo_layout_invalid, current);
1589         }
1590         lov_conf_thaw(lov);
1591         return lsm;
1592 }
1593
1594 int lov_read_and_clear_async_rc(struct cl_object *clob)
1595 {
1596         struct lu_object *luobj;
1597         int rc = 0;
1598         ENTRY;
1599
1600         luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
1601                                  &lov_device_type);
1602         if (luobj != NULL) {
1603                 struct lov_object *lov = lu2lov(luobj);
1604
1605                 lov_conf_freeze(lov);
1606                 switch (lov->lo_type) {
1607                 case LLT_RAID0: {
1608                         struct lov_stripe_md *lsm;
1609                         int i;
1610
1611                         lsm = lov->lo_lsm;
1612                         LASSERT(lsm != NULL);
1613                         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1614                                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1615
1616                                 if (lov_oinfo_is_dummy(loi))
1617                                         continue;
1618
1619                                 if (loi->loi_ar.ar_rc && !rc)
1620                                         rc = loi->loi_ar.ar_rc;
1621                                 loi->loi_ar.ar_rc = 0;
1622                         }
1623                 }
1624                 case LLT_RELEASED:
1625                 case LLT_EMPTY:
1626                         break;
1627                 default:
1628                         LBUG();
1629                 }
1630                 lov_conf_thaw(lov);
1631         }
1632         RETURN(rc);
1633 }
1634 EXPORT_SYMBOL(lov_read_and_clear_async_rc);
1635
1636 /** @} lov */