Whamcloud - gitweb
LU-4257 obdclass: Get rid of cl_env hash table
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_object for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LOV
43
44 #include "lov_cl_internal.h"
45
46 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
47 {
48         return lu2lov_dev(obj->lo_cl.co_lu.lo_dev);
49 }
50
51 /** \addtogroup lov
52  *  @{
53  */
54
55 /*****************************************************************************
56  *
57  * Layout operations.
58  *
59  */
60
61 struct lov_layout_operations {
62         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
63                         struct lov_object *lov, struct lov_stripe_md *lsm,
64                         const struct cl_object_conf *conf,
65                         union lov_layout_state *state);
66         int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
67                            union lov_layout_state *state);
68         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
69                          union lov_layout_state *state);
70         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
71                             union lov_layout_state *state);
72         int  (*llo_print)(const struct lu_env *env, void *cookie,
73                           lu_printer_t p, const struct lu_object *o);
74         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
75                               struct cl_page *page, pgoff_t index);
76         int  (*llo_lock_init)(const struct lu_env *env,
77                               struct cl_object *obj, struct cl_lock *lock,
78                               const struct cl_io *io);
79         int  (*llo_io_init)(const struct lu_env *env,
80                             struct cl_object *obj, struct cl_io *io);
81         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
82                             struct cl_attr *attr);
83 };
84
85 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
86
87 static void lov_lsm_put(struct lov_stripe_md *lsm)
88 {
89         if (lsm != NULL)
90                 lov_free_memmd(&lsm);
91 }
92
93 /*****************************************************************************
94  *
95  * Lov object layout operations.
96  *
97  */
98
99 static void lov_install_empty(const struct lu_env *env,
100                               struct lov_object *lov,
101                               union  lov_layout_state *state)
102 {
103         /*
104          * File without objects.
105          */
106 }
107
108 static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
109                           struct lov_object *lov, struct lov_stripe_md *lsm,
110                           const struct cl_object_conf *conf,
111                           union lov_layout_state *state)
112 {
113         return 0;
114 }
115
116 static void lov_install_raid0(const struct lu_env *env,
117                               struct lov_object *lov,
118                               union  lov_layout_state *state)
119 {
120 }
121
122 static struct cl_object *lov_sub_find(const struct lu_env *env,
123                                       struct cl_device *dev,
124                                       const struct lu_fid *fid,
125                                       const struct cl_object_conf *conf)
126 {
127         struct lu_object *o;
128
129         ENTRY;
130         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
131         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
132         RETURN(lu2cl(o));
133 }
134
135 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
136                         struct cl_object *stripe, struct lov_layout_raid0 *r0,
137                         int idx)
138 {
139         struct cl_object_header *hdr;
140         struct cl_object_header *subhdr;
141         struct cl_object_header *parent;
142         struct lov_oinfo        *oinfo;
143         int result;
144
145         if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
146                 /* For sanity:test_206.
147                  * Do not leave the object in cache to avoid accessing
148                  * freed memory. This is because osc_object is referring to
149                  * lov_oinfo of lsm_stripe_data which will be freed due to
150                  * this failure. */
151                 cl_object_kill(env, stripe);
152                 cl_object_put(env, stripe);
153                 return -EIO;
154         }
155
156         hdr    = cl_object_header(lov2cl(lov));
157         subhdr = cl_object_header(stripe);
158
159         oinfo = lov->lo_lsm->lsm_oinfo[idx];
160         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
161                " idx: %d gen: %d\n",
162                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
163                PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
164                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
165
166         /* reuse ->coh_attr_guard to protect coh_parent change */
167         spin_lock(&subhdr->coh_attr_guard);
168         parent = subhdr->coh_parent;
169         if (parent == NULL) {
170                 subhdr->coh_parent = hdr;
171                 spin_unlock(&subhdr->coh_attr_guard);
172                 subhdr->coh_nesting = hdr->coh_nesting + 1;
173                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
174                 r0->lo_sub[idx] = cl2lovsub(stripe);
175                 r0->lo_sub[idx]->lso_super = lov;
176                 r0->lo_sub[idx]->lso_index = idx;
177                 result = 0;
178         } else {
179                 struct lu_object  *old_obj;
180                 struct lov_object *old_lov;
181                 unsigned int mask = D_INODE;
182
183                 spin_unlock(&subhdr->coh_attr_guard);
184                 old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
185                 LASSERT(old_obj != NULL);
186                 old_lov = cl2lov(lu2cl(old_obj));
187                 if (old_lov->lo_layout_invalid) {
188                         /* the object's layout has already changed but isn't
189                          * refreshed */
190                         lu_object_unhash(env, &stripe->co_lu);
191                         result = -EAGAIN;
192                 } else {
193                         mask = D_ERROR;
194                         result = -EIO;
195                 }
196
197                 LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
198                                 "stripe %d is already owned.", idx);
199                 LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
200                 LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
201                 cl_object_put(env, stripe);
202         }
203         return result;
204 }
205
206 static int lov_page_slice_fixup(struct lov_object *lov,
207                                 struct cl_object *stripe)
208 {
209         struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
210         struct cl_object *o;
211
212         if (stripe == NULL)
213                 return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
214                        cfs_size_round(sizeof(struct lov_page));
215
216         cl_object_for_each(o, stripe)
217                 o->co_slice_off += hdr->coh_page_bufsize;
218
219         return cl_object_header(stripe)->coh_page_bufsize;
220 }
221
222 static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
223                           struct lov_object *lov, struct lov_stripe_md *lsm,
224                           const struct cl_object_conf *conf,
225                           union lov_layout_state *state)
226 {
227         int result;
228         int i;
229
230         struct cl_object        *stripe;
231         struct lov_thread_info  *lti     = lov_env_info(env);
232         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
233         struct lu_fid           *ofid    = &lti->lti_fid;
234         struct lov_layout_raid0 *r0      = &state->raid0;
235
236         ENTRY;
237
238         if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
239                 dump_lsm(D_ERROR, lsm);
240                 LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
241                          LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
242         }
243
244         LASSERT(lov->lo_lsm == NULL);
245         lov->lo_lsm = lsm_addref(lsm);
246         r0->lo_nr = lsm->lsm_stripe_count;
247         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
248
249         lov->lo_layout_invalid = true;
250
251         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
252         if (r0->lo_sub != NULL) {
253                 int psz = 0;
254
255                 result = 0;
256                 subconf->coc_inode = conf->coc_inode;
257                 spin_lock_init(&r0->lo_sub_lock);
258                 /*
259                  * Create stripe cl_objects.
260                  */
261                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
262                         struct cl_device *subdev;
263                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
264                         int ost_idx = oinfo->loi_ost_idx;
265
266                         if (lov_oinfo_is_dummy(oinfo))
267                                 continue;
268
269                         result = ostid_to_fid(ofid, &oinfo->loi_oi,
270                                               oinfo->loi_ost_idx);
271                         if (result != 0)
272                                 GOTO(out, result);
273
274                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
275                         subconf->u.coc_oinfo = oinfo;
276                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
277                         /* In the function below, .hs_keycmp resolves to
278                          * lu_obj_hop_keycmp() */
279                         /* coverity[overrun-buffer-val] */
280                         stripe = lov_sub_find(env, subdev, ofid, subconf);
281                         if (!IS_ERR(stripe)) {
282                                 result = lov_init_sub(env, lov, stripe, r0, i);
283                                 if (result == -EAGAIN) { /* try again */
284                                         --i;
285                                         result = 0;
286                                         continue;
287                                 }
288                         } else {
289                                 result = PTR_ERR(stripe);
290                         }
291
292                         if (result == 0) {
293                                 int sz = lov_page_slice_fixup(lov, stripe);
294                                 LASSERT(ergo(psz > 0, psz == sz));
295                                 psz = sz;
296                         }
297                 }
298                 if (result == 0)
299                         cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
300         } else
301                 result = -ENOMEM;
302 out:
303         RETURN(result);
304 }
305
306 static int lov_init_released(const struct lu_env *env,
307                              struct lov_device *dev, struct lov_object *lov,
308                              struct lov_stripe_md *lsm,
309                              const struct cl_object_conf *conf,
310                              union lov_layout_state *state)
311 {
312         LASSERT(lsm != NULL);
313         LASSERT(lsm_is_released(lsm));
314         LASSERT(lov->lo_lsm == NULL);
315
316         lov->lo_lsm = lsm_addref(lsm);
317         return 0;
318 }
319
320 static struct cl_object *lov_find_subobj(const struct lu_env *env,
321                                          struct lov_object *lov,
322                                          struct lov_stripe_md *lsm,
323                                          int stripe_idx)
324 {
325         struct lov_device       *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
326         struct lov_oinfo        *oinfo = lsm->lsm_oinfo[stripe_idx];
327         struct lov_thread_info  *lti = lov_env_info(env);
328         struct lu_fid           *ofid = &lti->lti_fid;
329         struct cl_device        *subdev;
330         int                     ost_idx;
331         int                     rc;
332         struct cl_object        *result;
333
334         if (lov->lo_type != LLT_RAID0)
335                 GOTO(out, result = NULL);
336
337         ost_idx = oinfo->loi_ost_idx;
338         rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
339         if (rc != 0)
340                 GOTO(out, result = NULL);
341
342         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
343         result = lov_sub_find(env, subdev, ofid, NULL);
344 out:
345         if (result == NULL)
346                 result = ERR_PTR(-EINVAL);
347         return result;
348 }
349
350 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
351                             union lov_layout_state *state)
352 {
353         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
354
355         lov_layout_wait(env, lov);
356         return 0;
357 }
358
359 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
360                                struct lovsub_object *los, int idx)
361 {
362         struct cl_object        *sub;
363         struct lov_layout_raid0 *r0;
364         struct lu_site          *site;
365         struct lu_site_bkt_data *bkt;
366         wait_queue_t          *waiter;
367
368         r0  = &lov->u.raid0;
369         LASSERT(r0->lo_sub[idx] == los);
370
371         sub  = lovsub2cl(los);
372         site = sub->co_lu.lo_dev->ld_site;
373         bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
374
375         cl_object_kill(env, sub);
376         /* release a reference to the sub-object and ... */
377         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
378         cl_object_put(env, sub);
379
380         /* ... wait until it is actually destroyed---sub-object clears its
381          * ->lo_sub[] slot in lovsub_object_fini() */
382         if (r0->lo_sub[idx] == los) {
383                 waiter = &lov_env_info(env)->lti_waiter;
384                 init_waitqueue_entry(waiter, current);
385                 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
386                 set_current_state(TASK_UNINTERRUPTIBLE);
387                 while (1) {
388                         /* this wait-queue is signaled at the end of
389                          * lu_object_free(). */
390                         set_current_state(TASK_UNINTERRUPTIBLE);
391                         spin_lock(&r0->lo_sub_lock);
392                         if (r0->lo_sub[idx] == los) {
393                                 spin_unlock(&r0->lo_sub_lock);
394                                 schedule();
395                         } else {
396                                 spin_unlock(&r0->lo_sub_lock);
397                                 set_current_state(TASK_RUNNING);
398                                 break;
399                         }
400                 }
401                 remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
402         }
403         LASSERT(r0->lo_sub[idx] == NULL);
404 }
405
406 static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
407                             union lov_layout_state *state)
408 {
409         struct lov_layout_raid0 *r0 = &state->raid0;
410         struct lov_stripe_md    *lsm = lov->lo_lsm;
411         int i;
412
413         ENTRY;
414
415         dump_lsm(D_INODE, lsm);
416
417         lov_layout_wait(env, lov);
418         if (r0->lo_sub != NULL) {
419                 for (i = 0; i < r0->lo_nr; ++i) {
420                         struct lovsub_object *los = r0->lo_sub[i];
421
422                         if (los != NULL) {
423                                 cl_object_prune(env, &los->lso_cl);
424                                 /*
425                                  * If top-level object is to be evicted from
426                                  * the cache, so are its sub-objects.
427                                  */
428                                 lov_subobject_kill(env, lov, los, i);
429                         }
430                 }
431         }
432         RETURN(0);
433 }
434
435 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
436                            union lov_layout_state *state)
437 {
438         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
439 }
440
441 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
442                            union lov_layout_state *state)
443 {
444         struct lov_layout_raid0 *r0 = &state->raid0;
445         ENTRY;
446
447         if (r0->lo_sub != NULL) {
448                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
449                 r0->lo_sub = NULL;
450         }
451
452         dump_lsm(D_INODE, lov->lo_lsm);
453         lov_free_memmd(&lov->lo_lsm);
454
455         EXIT;
456 }
457
458 static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
459                                 union lov_layout_state *state)
460 {
461         ENTRY;
462         dump_lsm(D_INODE, lov->lo_lsm);
463         lov_free_memmd(&lov->lo_lsm);
464         EXIT;
465 }
466
467 static int lov_print_empty(const struct lu_env *env, void *cookie,
468                            lu_printer_t p, const struct lu_object *o)
469 {
470         (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
471         return 0;
472 }
473
474 static int lov_print_raid0(const struct lu_env *env, void *cookie,
475                            lu_printer_t p, const struct lu_object *o)
476 {
477         struct lov_object       *lov = lu2lov(o);
478         struct lov_layout_raid0 *r0  = lov_r0(lov);
479         struct lov_stripe_md    *lsm = lov->lo_lsm;
480         int                      i;
481
482         (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
483                 r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
484                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
485                 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
486         for (i = 0; i < r0->lo_nr; ++i) {
487                 struct lu_object *sub;
488
489                 if (r0->lo_sub[i] != NULL) {
490                         sub = lovsub2lu(r0->lo_sub[i]);
491                         lu_object_print(env, cookie, p, sub);
492                 } else {
493                         (*p)(env, cookie, "sub %d absent\n", i);
494                 }
495         }
496         return 0;
497 }
498
499 static int lov_print_released(const struct lu_env *env, void *cookie,
500                                 lu_printer_t p, const struct lu_object *o)
501 {
502         struct lov_object       *lov = lu2lov(o);
503         struct lov_stripe_md    *lsm = lov->lo_lsm;
504
505         (*p)(env, cookie,
506                 "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
507                 lov->lo_layout_invalid ? "invalid" : "valid", lsm,
508                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
509                 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
510         return 0;
511 }
512
513 /**
514  * Implements cl_object_operations::coo_attr_get() method for an object
515  * without stripes (LLT_EMPTY layout type).
516  *
517  * The only attributes this layer is authoritative in this case is
518  * cl_attr::cat_blocks---it's 0.
519  */
520 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
521                               struct cl_attr *attr)
522 {
523         attr->cat_blocks = 0;
524         return 0;
525 }
526
527 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
528                               struct cl_attr *attr)
529 {
530         struct lov_object       *lov = cl2lov(obj);
531         struct lov_layout_raid0 *r0 = lov_r0(lov);
532         struct cl_attr          *lov_attr = &r0->lo_attr;
533         int                      result = 0;
534
535         ENTRY;
536
537         /* this is called w/o holding type guard mutex, so it must be inside
538          * an on going IO otherwise lsm may be replaced.
539          * LU-2117: it turns out there exists one exception. For mmaped files,
540          * the lock of those files may be requested in the other file's IO
541          * context, and this function is called in ccc_lock_state(), it will
542          * hit this assertion.
543          * Anyway, it's still okay to call attr_get w/o type guard as layout
544          * can't go if locks exist. */
545         /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
546
547         if (!r0->lo_attr_valid) {
548                 struct lov_stripe_md    *lsm = lov->lo_lsm;
549                 struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
550                 __u64                    kms = 0;
551
552                 memset(lvb, 0, sizeof(*lvb));
553                 /* XXX: timestamps can be negative by sanity:test_39m,
554                  * how can it be? */
555                 lvb->lvb_atime = LLONG_MIN;
556                 lvb->lvb_ctime = LLONG_MIN;
557                 lvb->lvb_mtime = LLONG_MIN;
558
559                 /*
560                  * XXX that should be replaced with a loop over sub-objects,
561                  * doing cl_object_attr_get() on them. But for now, let's
562                  * reuse old lov code.
563                  */
564
565                 /*
566                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
567                  * happy. It's not needed, because new code uses
568                  * ->coh_attr_guard spin-lock to protect consistency of
569                  * sub-object attributes.
570                  */
571                 lov_stripe_lock(lsm);
572                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
573                 lov_stripe_unlock(lsm);
574                 if (result == 0) {
575                         cl_lvb2attr(lov_attr, lvb);
576                         lov_attr->cat_kms = kms;
577                         r0->lo_attr_valid = 1;
578                 }
579         }
580         if (result == 0) { /* merge results */
581                 attr->cat_blocks = lov_attr->cat_blocks;
582                 attr->cat_size = lov_attr->cat_size;
583                 attr->cat_kms = lov_attr->cat_kms;
584                 if (attr->cat_atime < lov_attr->cat_atime)
585                         attr->cat_atime = lov_attr->cat_atime;
586                 if (attr->cat_ctime < lov_attr->cat_ctime)
587                         attr->cat_ctime = lov_attr->cat_ctime;
588                 if (attr->cat_mtime < lov_attr->cat_mtime)
589                         attr->cat_mtime = lov_attr->cat_mtime;
590         }
591         RETURN(result);
592 }
593
594 const static struct lov_layout_operations lov_dispatch[] = {
595         [LLT_EMPTY] = {
596                 .llo_init      = lov_init_empty,
597                 .llo_delete    = lov_delete_empty,
598                 .llo_fini      = lov_fini_empty,
599                 .llo_install   = lov_install_empty,
600                 .llo_print     = lov_print_empty,
601                 .llo_page_init = lov_page_init_empty,
602                 .llo_lock_init = lov_lock_init_empty,
603                 .llo_io_init   = lov_io_init_empty,
604                 .llo_getattr   = lov_attr_get_empty,
605         },
606         [LLT_RAID0] = {
607                 .llo_init      = lov_init_raid0,
608                 .llo_delete    = lov_delete_raid0,
609                 .llo_fini      = lov_fini_raid0,
610                 .llo_install   = lov_install_raid0,
611                 .llo_print     = lov_print_raid0,
612                 .llo_page_init = lov_page_init_raid0,
613                 .llo_lock_init = lov_lock_init_raid0,
614                 .llo_io_init   = lov_io_init_raid0,
615                 .llo_getattr   = lov_attr_get_raid0,
616         },
617         [LLT_RELEASED] = {
618                 .llo_init      = lov_init_released,
619                 .llo_delete    = lov_delete_empty,
620                 .llo_fini      = lov_fini_released,
621                 .llo_install   = lov_install_empty,
622                 .llo_print     = lov_print_released,
623                 .llo_page_init = lov_page_init_empty,
624                 .llo_lock_init = lov_lock_init_empty,
625                 .llo_io_init   = lov_io_init_released,
626                 .llo_getattr   = lov_attr_get_empty,
627         }
628 };
629
630 /**
631  * Performs a double-dispatch based on the layout type of an object.
632  */
633 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)                              \
634 ({                                                                      \
635         struct lov_object                      *__obj = (obj);          \
636         enum lov_layout_type                    __llt;                  \
637                                                                         \
638         __llt = __obj->lo_type;                                         \
639         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
640         lov_dispatch[__llt].op(__VA_ARGS__);                            \
641 })
642
643 /**
644  * Return lov_layout_type associated with a given lsm
645  */
646 static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
647 {
648         if (lsm == NULL)
649                 return LLT_EMPTY;
650         if (lsm_is_released(lsm))
651                 return LLT_RELEASED;
652         return LLT_RAID0;
653 }
654
655 static inline void lov_conf_freeze(struct lov_object *lov)
656 {
657         if (lov->lo_owner != current)
658                 down_read(&lov->lo_type_guard);
659 }
660
661 static inline void lov_conf_thaw(struct lov_object *lov)
662 {
663         if (lov->lo_owner != current)
664                 up_read(&lov->lo_type_guard);
665 }
666
667 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
668 ({                                                                      \
669         struct lov_object                      *__obj = (obj);          \
670         int                                     __lock = !!(lock);      \
671         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
672                                                                         \
673         if (__lock)                                                     \
674                 lov_conf_freeze(__obj);                                 \
675         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
676         if (__lock)                                                     \
677                 lov_conf_thaw(__obj);                                   \
678         __result;                                                       \
679 })
680
681 /**
682  * Performs a locked double-dispatch based on the layout type of an object.
683  */
684 #define LOV_2DISPATCH(obj, op, ...)                     \
685         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
686
687 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
688 do {                                                                    \
689         struct lov_object                      *__obj = (obj);          \
690         enum lov_layout_type                    __llt;                  \
691                                                                         \
692         lov_conf_freeze(__obj);                                         \
693         __llt = __obj->lo_type;                                         \
694         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
695         lov_dispatch[__llt].op(__VA_ARGS__);                            \
696         lov_conf_thaw(__obj);                                           \
697 } while (0)
698
699 static void lov_conf_lock(struct lov_object *lov)
700 {
701         LASSERT(lov->lo_owner != current);
702         down_write(&lov->lo_type_guard);
703         LASSERT(lov->lo_owner == NULL);
704         lov->lo_owner = current;
705 }
706
707 static void lov_conf_unlock(struct lov_object *lov)
708 {
709         lov->lo_owner = NULL;
710         up_write(&lov->lo_type_guard);
711 }
712
713 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
714 {
715         struct l_wait_info lwi = { 0 };
716         ENTRY;
717
718         while (atomic_read(&lov->lo_active_ios) > 0) {
719                 CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
720                         PFID(lu_object_fid(lov2lu(lov))),
721                         atomic_read(&lov->lo_active_ios));
722
723                 l_wait_event(lov->lo_waitq,
724                              atomic_read(&lov->lo_active_ios) == 0, &lwi);
725         }
726         RETURN(0);
727 }
728
729 static int lov_layout_change(const struct lu_env *unused,
730                              struct lov_object *lov, struct lov_stripe_md *lsm,
731                              const struct cl_object_conf *conf)
732 {
733         enum lov_layout_type llt = lov_type(lsm);
734         union lov_layout_state *state = &lov->u;
735         const struct lov_layout_operations *old_ops;
736         const struct lov_layout_operations *new_ops;
737         struct lu_env *env;
738         __u16 refcheck;
739         int rc;
740         ENTRY;
741
742         LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
743
744         env = cl_env_get(&refcheck);
745         if (IS_ERR(env))
746                 RETURN(PTR_ERR(env));
747
748         LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
749
750         CDEBUG(D_INODE, DFID" from %s to %s\n",
751                PFID(lu_object_fid(lov2lu(lov))),
752                llt2str(lov->lo_type), llt2str(llt));
753
754         old_ops = &lov_dispatch[lov->lo_type];
755         new_ops = &lov_dispatch[llt];
756
757         rc = cl_object_prune(env, &lov->lo_cl);
758         if (rc != 0)
759                 GOTO(out, rc);
760
761         rc = old_ops->llo_delete(env, lov, &lov->u);
762         if (rc != 0)
763                 GOTO(out, rc);
764
765         old_ops->llo_fini(env, lov, &lov->u);
766
767         LASSERT(atomic_read(&lov->lo_active_ios) == 0);
768
769         lov->lo_type = LLT_EMPTY;
770
771         /* page bufsize fixup */
772         cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
773                 lov_page_slice_fixup(lov, NULL);
774
775         rc = new_ops->llo_init(env, lov_object_dev(lov), lov, lsm, conf, state);
776         if (rc != 0) {
777                 new_ops->llo_delete(env, lov, state);
778                 new_ops->llo_fini(env, lov, state);
779                 /* this file becomes an EMPTY file. */
780                 GOTO(out, rc);
781         }
782
783         new_ops->llo_install(env, lov, state);
784         lov->lo_type = llt;
785
786 out:
787         cl_env_put(env, &refcheck);
788         RETURN(rc);
789 }
790
791 /*****************************************************************************
792  *
793  * Lov object operations.
794  *
795  */
796 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
797                     const struct lu_object_conf *conf)
798 {
799         struct lov_object            *lov   = lu2lov(obj);
800         struct lov_device            *dev   = lov_object_dev(lov);
801         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
802         union lov_layout_state       *set   = &lov->u;
803         const struct lov_layout_operations *ops;
804         struct lov_stripe_md *lsm = NULL;
805         int rc;
806         ENTRY;
807
808         init_rwsem(&lov->lo_type_guard);
809         atomic_set(&lov->lo_active_ios, 0);
810         init_waitqueue_head(&lov->lo_waitq);
811         cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
812
813         lov->lo_type = LLT_EMPTY;
814         if (cconf->u.coc_layout.lb_buf != NULL) {
815                 lsm = lov_unpackmd(dev->ld_lov,
816                                    cconf->u.coc_layout.lb_buf,
817                                    cconf->u.coc_layout.lb_len);
818                 if (IS_ERR(lsm))
819                         RETURN(PTR_ERR(lsm));
820         }
821
822         /* no locking is necessary, as object is being created */
823         lov->lo_type = lov_type(lsm);
824         ops = &lov_dispatch[lov->lo_type];
825         rc = ops->llo_init(env, dev, lov, lsm, cconf, set);
826         if (rc != 0)
827                 GOTO(out_lsm, rc);
828
829         ops->llo_install(env, lov, set);
830
831 out_lsm:
832         lov_lsm_put(lsm);
833
834         RETURN(rc);
835 }
836
837 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
838                         const struct cl_object_conf *conf)
839 {
840         struct lov_stripe_md    *lsm = NULL;
841         struct lov_object       *lov = cl2lov(obj);
842         int                      result = 0;
843         ENTRY;
844
845         if (conf->coc_opc == OBJECT_CONF_SET &&
846             conf->u.coc_layout.lb_buf != NULL) {
847                 lsm = lov_unpackmd(lov_object_dev(lov)->ld_lov,
848                                    conf->u.coc_layout.lb_buf,
849                                    conf->u.coc_layout.lb_len);
850                 if (IS_ERR(lsm))
851                         RETURN(PTR_ERR(lsm));
852         }
853
854         lov_conf_lock(lov);
855         if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
856                 lov->lo_layout_invalid = true;
857                 GOTO(out, result = 0);
858         }
859
860         if (conf->coc_opc == OBJECT_CONF_WAIT) {
861                 if (lov->lo_layout_invalid &&
862                     atomic_read(&lov->lo_active_ios) > 0) {
863                         lov_conf_unlock(lov);
864                         result = lov_layout_wait(env, lov);
865                         lov_conf_lock(lov);
866                 }
867                 GOTO(out, result);
868         }
869
870         LASSERT(conf->coc_opc == OBJECT_CONF_SET);
871
872         if ((lsm == NULL && lov->lo_lsm == NULL) ||
873             ((lsm != NULL && lov->lo_lsm != NULL) &&
874              (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
875              (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
876                 /* same version of layout */
877                 lov->lo_layout_invalid = false;
878                 GOTO(out, result = 0);
879         }
880
881         /* will change layout - check if there still exists active IO. */
882         if (atomic_read(&lov->lo_active_ios) > 0) {
883                 lov->lo_layout_invalid = true;
884                 GOTO(out, result = -EBUSY);
885         }
886
887         result = lov_layout_change(env, lov, lsm, conf);
888         lov->lo_layout_invalid = result != 0;
889         EXIT;
890
891 out:
892         lov_conf_unlock(lov);
893         lov_lsm_put(lsm);
894         CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
895                PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
896         RETURN(result);
897 }
898
899 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
900 {
901         struct lov_object *lov = lu2lov(obj);
902
903         ENTRY;
904         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
905         EXIT;
906 }
907
908 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
909 {
910         struct lov_object *lov = lu2lov(obj);
911
912         ENTRY;
913         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
914         lu_object_fini(obj);
915         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
916         EXIT;
917 }
918
919 static int lov_object_print(const struct lu_env *env, void *cookie,
920                             lu_printer_t p, const struct lu_object *o)
921 {
922         return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
923 }
924
925 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
926                   struct cl_page *page, pgoff_t index)
927 {
928         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
929                                     index);
930 }
931
932 /**
933  * Implements cl_object_operations::clo_io_init() method for lov
934  * layer. Dispatches to the appropriate layout io initialization method.
935  */
936 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
937                 struct cl_io *io)
938 {
939         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
940         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
941                                      !io->ci_ignore_layout, env, obj, io);
942 }
943
944 /**
945  * An implementation of cl_object_operations::clo_attr_get() method for lov
946  * layer. For raid0 layout this collects and merges attributes of all
947  * sub-objects.
948  */
949 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
950                         struct cl_attr *attr)
951 {
952         /* do not take lock, as this function is called under a
953          * spin-lock. Layout is protected from changing by ongoing IO. */
954         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
955 }
956
957 static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
958                            const struct cl_attr *attr, unsigned valid)
959 {
960         /*
961          * No dispatch is required here, as no layout implements this.
962          */
963         return 0;
964 }
965
966 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
967                   struct cl_lock *lock, const struct cl_io *io)
968 {
969         /* No need to lock because we've taken one refcount of layout.  */
970         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock,
971                                     io);
972 }
973
974 /**
975  * We calculate on which OST the mapping will end. If the length of mapping
976  * is greater than (stripe_size * stripe_count) then the last_stripe will
977  * will be one just before start_stripe. Else we check if the mapping
978  * intersects each OST and find last_stripe.
979  * This function returns the last_stripe and also sets the stripe_count
980  * over which the mapping is spread
981  *
982  * \param lsm [in]              striping information for the file
983  * \param fm_start [in]         logical start of mapping
984  * \param fm_end [in]           logical end of mapping
985  * \param start_stripe [in]     starting stripe of the mapping
986  * \param stripe_count [out]    the number of stripes across which to map is
987  *                              returned
988  *
989  * \retval last_stripe          return the last stripe of the mapping
990  */
991 static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm,
992                                    loff_t fm_start, loff_t fm_end,
993                                    int start_stripe, int *stripe_count)
994 {
995         int last_stripe;
996         loff_t obd_start;
997         loff_t obd_end;
998         int i, j;
999
1000         if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
1001                 last_stripe = (start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
1002                                                               start_stripe - 1);
1003                 *stripe_count = lsm->lsm_stripe_count;
1004         } else {
1005                 for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
1006                      i = (i + 1) % lsm->lsm_stripe_count, j++) {
1007                         if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
1008                                                    &obd_start, &obd_end)) == 0)
1009                                 break;
1010                 }
1011                 *stripe_count = j;
1012                 last_stripe = (start_stripe + j - 1) % lsm->lsm_stripe_count;
1013         }
1014
1015         return last_stripe;
1016 }
1017
1018 /**
1019  * Set fe_device and copy extents from local buffer into main return buffer.
1020  *
1021  * \param fiemap [out]          fiemap to hold all extents
1022  * \param lcl_fm_ext [in]       array of fiemap extents get from OSC layer
1023  * \param ost_index [in]        OST index to be written into the fm_device
1024  *                              field for each extent
1025  * \param ext_count [in]        number of extents to be copied
1026  * \param current_extent [in]   where to start copying in the extent array
1027  */
1028 static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
1029                                          struct fiemap_extent *lcl_fm_ext,
1030                                          int ost_index, unsigned int ext_count,
1031                                          int current_extent)
1032 {
1033         char            *to;
1034         unsigned int    ext;
1035
1036         for (ext = 0; ext < ext_count; ext++) {
1037                 lcl_fm_ext[ext].fe_device = ost_index;
1038                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1039         }
1040
1041         /* Copy fm_extent's from fm_local to return buffer */
1042         to = (char *)fiemap + fiemap_count_to_size(current_extent);
1043         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct fiemap_extent));
1044 }
1045
1046 #define FIEMAP_BUFFER_SIZE 4096
1047
1048 /**
1049  * Non-zero fe_logical indicates that this is a continuation FIEMAP
1050  * call. The local end offset and the device are sent in the first
1051  * fm_extent. This function calculates the stripe number from the index.
1052  * This function returns a stripe_no on which mapping is to be restarted.
1053  *
1054  * This function returns fm_end_offset which is the in-OST offset at which
1055  * mapping should be restarted. If fm_end_offset=0 is returned then caller
1056  * will re-calculate proper offset in next stripe.
1057  * Note that the first extent is passed to lov_get_info via the value field.
1058  *
1059  * \param fiemap [in]           fiemap request header
1060  * \param lsm [in]              striping information for the file
1061  * \param fm_start [in]         logical start of mapping
1062  * \param fm_end [in]           logical end of mapping
1063  * \param start_stripe [out]    starting stripe will be returned in this
1064  */
1065 static loff_t fiemap_calc_fm_end_offset(struct fiemap *fiemap,
1066                                         struct lov_stripe_md *lsm,
1067                                         loff_t fm_start, loff_t fm_end,
1068                                         int *start_stripe)
1069 {
1070         loff_t local_end = fiemap->fm_extents[0].fe_logical;
1071         loff_t lun_start;
1072         loff_t lun_end;
1073         loff_t fm_end_offset;
1074         int stripe_no = -1;
1075         int i;
1076
1077         if (fiemap->fm_extent_count == 0 ||
1078             fiemap->fm_extents[0].fe_logical == 0)
1079                 return 0;
1080
1081         /* Find out stripe_no from ost_index saved in the fe_device */
1082         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1083                 struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
1084
1085                 if (lov_oinfo_is_dummy(oinfo))
1086                         continue;
1087
1088                 if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1089                         stripe_no = i;
1090                         break;
1091                 }
1092         }
1093
1094         if (stripe_no == -1)
1095                 return -EINVAL;
1096
1097         /* If we have finished mapping on previous device, shift logical
1098          * offset to start of next device */
1099         if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
1100                                   &lun_start, &lun_end) != 0 &&
1101             local_end < lun_end) {
1102                 fm_end_offset = local_end;
1103                 *start_stripe = stripe_no;
1104         } else {
1105                 /* This is a special value to indicate that caller should
1106                  * calculate offset in next stripe. */
1107                 fm_end_offset = 0;
1108                 *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
1109         }
1110
1111         return fm_end_offset;
1112 }
1113
1114 /**
1115  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1116  * This also handles the restarting of FIEMAP calls in case mapping overflows
1117  * the available number of extents in single call.
1118  *
1119  * \param env [in]              lustre environment
1120  * \param obj [in]              file object
1121  * \param fmkey [in]            fiemap request header and other info
1122  * \param fiemap [out]          fiemap buffer holding retrived map extents
1123  * \param buflen [in/out]       max buffer length of @fiemap, when iterate
1124  *                              each OST, it is used to limit max map needed
1125  * \retval 0    success
1126  * \retval < 0  error
1127  */
1128 static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
1129                              struct ll_fiemap_info_key *fmkey,
1130                              struct fiemap *fiemap, size_t *buflen)
1131 {
1132         struct lov_stripe_md    *lsm;
1133         struct cl_object        *subobj = NULL;
1134         struct lov_obd          *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
1135         struct fiemap           *fm_local = NULL;
1136         struct fiemap_extent    *lcl_fm_ext;
1137         loff_t                  fm_start;
1138         loff_t                  fm_end;
1139         loff_t                  fm_length;
1140         loff_t                  fm_end_offset;
1141         int                     count_local;
1142         int                     ost_index = 0;
1143         int                     start_stripe;
1144         int                     current_extent = 0;
1145         int                     rc = 0;
1146         int                     last_stripe;
1147         int                     cur_stripe = 0;
1148         int                     cur_stripe_wrap = 0;
1149         int                     stripe_count;
1150         unsigned int            buffer_size = FIEMAP_BUFFER_SIZE;
1151         /* Whether have we collected enough extents */
1152         bool                    enough = false;
1153         /* EOF for object */
1154         bool                    ost_eof = false;
1155         /* done with required mapping for this OST? */
1156         bool                    ost_done = false;
1157         ENTRY;
1158
1159         lsm = lov_lsm_addref(cl2lov(obj));
1160         if (lsm == NULL)
1161                 RETURN(-ENODATA);
1162
1163         /**
1164          * If the stripe_count > 1 and the application does not understand
1165          * DEVICE_ORDER flag, it cannot interpret the extents correctly.
1166          */
1167         if (lsm->lsm_stripe_count > 1 && !(fiemap->fm_flags &
1168                                            FIEMAP_FLAG_DEVICE_ORDER))
1169                 GOTO(out_lsm, rc = -ENOTSUPP);
1170
1171         if (lsm_is_released(lsm)) {
1172                 if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
1173                         /**
1174                          * released file, return a minimal FIEMAP if
1175                          * request fits in file-size.
1176                          */
1177                         fiemap->fm_mapped_extents = 1;
1178                         fiemap->fm_extents[0].fe_logical = fiemap->fm_start;
1179                         if (fiemap->fm_start + fiemap->fm_length <
1180                             fmkey->lfik_oa.o_size)
1181                                 fiemap->fm_extents[0].fe_length =
1182                                         fiemap->fm_length;
1183                         else
1184                                 fiemap->fm_extents[0].fe_length =
1185                                         fmkey->lfik_oa.o_size -
1186                                         fiemap->fm_start;
1187                         fiemap->fm_extents[0].fe_flags |=
1188                                 FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST;
1189                 }
1190                 GOTO(out_lsm, rc = 0);
1191         }
1192
1193         if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
1194                 buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
1195
1196         OBD_ALLOC_LARGE(fm_local, buffer_size);
1197         if (fm_local == NULL)
1198                 GOTO(out_lsm, rc = -ENOMEM);
1199         lcl_fm_ext = &fm_local->fm_extents[0];
1200         count_local = fiemap_size_to_count(buffer_size);
1201
1202         fm_start = fiemap->fm_start;
1203         fm_length = fiemap->fm_length;
1204         /* Calculate start stripe, last stripe and length of mapping */
1205         start_stripe = lov_stripe_number(lsm, fm_start);
1206         fm_end = (fm_length == ~0ULL) ? fmkey->lfik_oa.o_size :
1207                                         fm_start + fm_length - 1;
1208         /* If fm_length != ~0ULL but fm_start_fm_length-1 exceeds file size */
1209         if (fm_end > fmkey->lfik_oa.o_size)
1210                 fm_end = fmkey->lfik_oa.o_size;
1211
1212         last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
1213                                               start_stripe, &stripe_count);
1214         fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start, fm_end,
1215                                                   &start_stripe);
1216         if (fm_end_offset == -EINVAL)
1217                 GOTO(out_fm_local, rc = -EINVAL);
1218
1219         /**
1220          * Requested extent count exceeds the fiemap buffer size, shrink our
1221          * ambition.
1222          */
1223         if (fiemap_count_to_size(fiemap->fm_extent_count) > *buflen)
1224                 fiemap->fm_extent_count = fiemap_size_to_count(*buflen);
1225         if (fiemap->fm_extent_count == 0)
1226                 count_local = 0;
1227
1228         /* Check each stripe */
1229         for (cur_stripe = start_stripe; stripe_count > 0;
1230              --stripe_count,
1231              cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
1232                 loff_t req_fm_len; /* Stores length of required mapping */
1233                 loff_t len_mapped_single_call;
1234                 loff_t lun_start;
1235                 loff_t lun_end;
1236                 loff_t obd_object_end;
1237                 unsigned int ext_count;
1238
1239                 cur_stripe_wrap = cur_stripe;
1240
1241                 /* Find out range of mapping on this stripe */
1242                 if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
1243                                            &lun_start, &obd_object_end)) == 0)
1244                         continue;
1245
1246                 if (lov_oinfo_is_dummy(lsm->lsm_oinfo[cur_stripe]))
1247                         GOTO(out_fm_local, rc = -EIO);
1248
1249                 /* If this is a continuation FIEMAP call and we are on
1250                  * starting stripe then lun_start needs to be set to
1251                  * fm_end_offset */
1252                 if (fm_end_offset != 0 && cur_stripe == start_stripe)
1253                         lun_start = fm_end_offset;
1254
1255                 if (fm_length != ~0ULL) {
1256                         /* Handle fm_start + fm_length overflow */
1257                         if (fm_start + fm_length < fm_start)
1258                                 fm_length = ~0ULL - fm_start;
1259                         lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
1260                                                      cur_stripe);
1261                 } else {
1262                         lun_end = ~0ULL;
1263                 }
1264
1265                 if (lun_start == lun_end)
1266                         continue;
1267
1268                 req_fm_len = obd_object_end - lun_start;
1269                 fm_local->fm_length = 0;
1270                 len_mapped_single_call = 0;
1271
1272                 /* find lobsub object */
1273                 subobj = lov_find_subobj(env, cl2lov(obj), lsm,
1274                                              cur_stripe);
1275                 if (IS_ERR(subobj))
1276                         GOTO(out_fm_local, rc = PTR_ERR(subobj));
1277                 /* If the output buffer is very large and the objects have many
1278                  * extents we may need to loop on a single OST repeatedly */
1279                 ost_eof = false;
1280                 ost_done = false;
1281                 do {
1282                         if (fiemap->fm_extent_count > 0) {
1283                                 /* Don't get too many extents. */
1284                                 if (current_extent + count_local >
1285                                     fiemap->fm_extent_count)
1286                                         count_local = fiemap->fm_extent_count -
1287                                                       current_extent;
1288                         }
1289
1290                         lun_start += len_mapped_single_call;
1291                         fm_local->fm_length = req_fm_len -
1292                                               len_mapped_single_call;
1293                         req_fm_len = fm_local->fm_length;
1294                         fm_local->fm_extent_count = enough ? 1 : count_local;
1295                         fm_local->fm_mapped_extents = 0;
1296                         fm_local->fm_flags = fiemap->fm_flags;
1297
1298                         ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
1299
1300                         if (ost_index < 0 ||
1301                             ost_index >= lov->desc.ld_tgt_count)
1302                                 GOTO(obj_put, rc = -EINVAL);
1303                         /* If OST is inactive, return extent with UNKNOWN
1304                          * flag. */
1305                         if (!lov->lov_tgts[ost_index]->ltd_active) {
1306                                 fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
1307                                 fm_local->fm_mapped_extents = 1;
1308
1309                                 lcl_fm_ext[0].fe_logical = lun_start;
1310                                 lcl_fm_ext[0].fe_length = obd_object_end -
1311                                                           lun_start;
1312                                 lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1313
1314                                 goto inactive_tgt;
1315                         }
1316
1317                         fm_local->fm_start = lun_start;
1318                         fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1319                         memcpy(&fmkey->lfik_fiemap, fm_local,
1320                                sizeof(*fm_local));
1321                         *buflen = fiemap_count_to_size(
1322                                                 fm_local->fm_extent_count);
1323
1324                         rc = cl_object_fiemap(env, subobj, fmkey, fm_local,
1325                                               buflen);
1326                         if (rc != 0)
1327                                 GOTO(obj_put, rc);
1328 inactive_tgt:
1329                         ext_count = fm_local->fm_mapped_extents;
1330                         if (ext_count == 0) {
1331                                 ost_done = true;
1332                                 /* If last stripe has hold at the end,
1333                                  * we need to return */
1334                                 if (cur_stripe_wrap == last_stripe) {
1335                                         fiemap->fm_mapped_extents = 0;
1336                                         goto finish;
1337                                 }
1338                                 break;
1339                         } else if (enough) {
1340                                 /*
1341                                  * We've collected enough extents and there are
1342                                  * more extents after it.
1343                                  */
1344                                 goto finish;
1345                         }
1346
1347                         /* If we just need num of extents, got to next device */
1348                         if (fiemap->fm_extent_count == 0) {
1349                                 current_extent += ext_count;
1350                                 break;
1351                         }
1352
1353                         /* prepare to copy retrived map extents */
1354                         len_mapped_single_call =
1355                                 lcl_fm_ext[ext_count - 1].fe_logical -
1356                                 lun_start + lcl_fm_ext[ext_count - 1].fe_length;
1357
1358                         /* Have we finished mapping on this device? */
1359                         if (req_fm_len <= len_mapped_single_call)
1360                                 ost_done = true;
1361
1362                         /* Clear the EXTENT_LAST flag which can be present on
1363                          * the last extent */
1364                         if (lcl_fm_ext[ext_count - 1].fe_flags &
1365                             FIEMAP_EXTENT_LAST)
1366                                 lcl_fm_ext[ext_count - 1].fe_flags &=
1367                                                         ~FIEMAP_EXTENT_LAST;
1368                         if (lov_stripe_size(lsm,
1369                                         lcl_fm_ext[ext_count - 1].fe_logical +
1370                                         lcl_fm_ext[ext_count - 1].fe_length,
1371                                         cur_stripe) >= fmkey->lfik_oa.o_size)
1372                                 ost_eof = true;
1373
1374                         fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
1375                                                      ost_index, ext_count,
1376                                                      current_extent);
1377                         current_extent += ext_count;
1378
1379                         /* Ran out of available extents? */
1380                         if (current_extent >= fiemap->fm_extent_count)
1381                                 enough = true;
1382                 } while (!ost_done && !ost_eof);
1383
1384                 cl_object_put(env, subobj);
1385                 subobj = NULL;
1386
1387                 if (cur_stripe_wrap == last_stripe)
1388                         goto finish;
1389         } /* for each stripe */
1390 finish:
1391         /* Indicate that we are returning device offsets unless file just has
1392          * single stripe */
1393         if (lsm->lsm_stripe_count > 1)
1394                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1395
1396         if (fiemap->fm_extent_count == 0)
1397                 goto skip_last_device_calc;
1398
1399         /* Check if we have reached the last stripe and whether mapping for that
1400          * stripe is done. */
1401         if ((cur_stripe_wrap == last_stripe) && (ost_done || ost_eof))
1402                 fiemap->fm_extents[current_extent - 1].fe_flags |=
1403                                                              FIEMAP_EXTENT_LAST;
1404 skip_last_device_calc:
1405         fiemap->fm_mapped_extents = current_extent;
1406 obj_put:
1407         if (subobj != NULL)
1408                 cl_object_put(env, subobj);
1409 out_fm_local:
1410         OBD_FREE_LARGE(fm_local, buffer_size);
1411
1412 out_lsm:
1413         lov_lsm_put(lsm);
1414
1415         return rc;
1416 }
1417
1418 static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
1419                                 struct lov_user_md __user *lum)
1420 {
1421         struct lov_object       *lov = cl2lov(obj);
1422         struct lov_stripe_md    *lsm;
1423         int                     rc = 0;
1424         ENTRY;
1425
1426         lsm = lov_lsm_addref(lov);
1427         if (lsm == NULL)
1428                 RETURN(-ENODATA);
1429
1430         rc = lov_getstripe(cl2lov(obj), lsm, lum);
1431         lov_lsm_put(lsm);
1432         RETURN(rc);
1433 }
1434
1435 static int lov_object_layout_get(const struct lu_env *env,
1436                                  struct cl_object *obj,
1437                                  struct cl_layout *cl)
1438 {
1439         struct lov_object *lov = cl2lov(obj);
1440         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1441         struct lu_buf *buf = &cl->cl_buf;
1442         ssize_t rc;
1443         ENTRY;
1444
1445         if (lsm == NULL) {
1446                 cl->cl_size = 0;
1447                 cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
1448
1449                 RETURN(0);
1450         }
1451
1452         cl->cl_size = lov_mds_md_size(lsm->lsm_stripe_count, lsm->lsm_magic);
1453         cl->cl_layout_gen = lsm->lsm_layout_gen;
1454
1455         rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
1456         lov_lsm_put(lsm);
1457
1458         RETURN(rc < 0 ? rc : 0);
1459 }
1460
1461 static loff_t lov_object_maxbytes(struct cl_object *obj)
1462 {
1463         struct lov_object *lov = cl2lov(obj);
1464         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1465         loff_t maxbytes;
1466
1467         if (lsm == NULL)
1468                 return LLONG_MAX;
1469
1470         maxbytes = lsm->lsm_maxbytes;
1471
1472         lov_lsm_put(lsm);
1473
1474         return maxbytes;
1475 }
1476
1477 static const struct cl_object_operations lov_ops = {
1478         .coo_page_init    = lov_page_init,
1479         .coo_lock_init    = lov_lock_init,
1480         .coo_io_init      = lov_io_init,
1481         .coo_attr_get     = lov_attr_get,
1482         .coo_attr_update  = lov_attr_update,
1483         .coo_conf_set     = lov_conf_set,
1484         .coo_getstripe    = lov_object_getstripe,
1485         .coo_layout_get   = lov_object_layout_get,
1486         .coo_maxbytes     = lov_object_maxbytes,
1487         .coo_fiemap       = lov_object_fiemap,
1488 };
1489
1490 static const struct lu_object_operations lov_lu_obj_ops = {
1491         .loo_object_init      = lov_object_init,
1492         .loo_object_delete    = lov_object_delete,
1493         .loo_object_release   = NULL,
1494         .loo_object_free      = lov_object_free,
1495         .loo_object_print     = lov_object_print,
1496         .loo_object_invariant = NULL
1497 };
1498
1499 struct lu_object *lov_object_alloc(const struct lu_env *env,
1500                                    const struct lu_object_header *unused,
1501                                    struct lu_device *dev)
1502 {
1503         struct lov_object *lov;
1504         struct lu_object  *obj;
1505
1506         ENTRY;
1507         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
1508         if (lov != NULL) {
1509                 obj = lov2lu(lov);
1510                 lu_object_init(obj, NULL, dev);
1511                 lov->lo_cl.co_ops = &lov_ops;
1512                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
1513                 /*
1514                  * object io operation vector (cl_object::co_iop) is installed
1515                  * later in lov_object_init(), as different vectors are used
1516                  * for object with different layouts.
1517                  */
1518                 obj->lo_ops = &lov_lu_obj_ops;
1519         } else
1520                 obj = NULL;
1521         RETURN(obj);
1522 }
1523
1524 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
1525 {
1526         struct lov_stripe_md *lsm = NULL;
1527
1528         lov_conf_freeze(lov);
1529         if (lov->lo_lsm != NULL) {
1530                 lsm = lsm_addref(lov->lo_lsm);
1531                 CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
1532                         lsm, atomic_read(&lsm->lsm_refc),
1533                         lov->lo_layout_invalid, current);
1534         }
1535         lov_conf_thaw(lov);
1536         return lsm;
1537 }
1538
1539 int lov_read_and_clear_async_rc(struct cl_object *clob)
1540 {
1541         struct lu_object *luobj;
1542         int rc = 0;
1543         ENTRY;
1544
1545         luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
1546                                  &lov_device_type);
1547         if (luobj != NULL) {
1548                 struct lov_object *lov = lu2lov(luobj);
1549
1550                 lov_conf_freeze(lov);
1551                 switch (lov->lo_type) {
1552                 case LLT_RAID0: {
1553                         struct lov_stripe_md *lsm;
1554                         int i;
1555
1556                         lsm = lov->lo_lsm;
1557                         LASSERT(lsm != NULL);
1558                         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1559                                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1560
1561                                 if (lov_oinfo_is_dummy(loi))
1562                                         continue;
1563
1564                                 if (loi->loi_ar.ar_rc && !rc)
1565                                         rc = loi->loi_ar.ar_rc;
1566                                 loi->loi_ar.ar_rc = 0;
1567                         }
1568                 }
1569                 case LLT_RELEASED:
1570                 case LLT_EMPTY:
1571                         break;
1572                 default:
1573                         LBUG();
1574                 }
1575                 lov_conf_thaw(lov);
1576         }
1577         RETURN(rc);
1578 }
1579 EXPORT_SYMBOL(lov_read_and_clear_async_rc);
1580
1581 /** @} lov */