Whamcloud - gitweb
LU-8998 lov: add composite layout unpacking
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * Implementation of cl_object for LOV layer.
33  *
34  *   Author: Nikita Danilov <nikita.danilov@sun.com>
35  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LOV
39
40 #include "lov_cl_internal.h"
41
42 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
43 {
44         return lu2lov_dev(obj->lo_cl.co_lu.lo_dev);
45 }
46
47 /** \addtogroup lov
48  *  @{
49  */
50
51 /*****************************************************************************
52  *
53  * Layout operations.
54  *
55  */
56
57 struct lov_layout_operations {
58         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
59                         struct lov_object *lov, struct lov_stripe_md *lsm,
60                         const struct cl_object_conf *conf,
61                         union lov_layout_state *state);
62         int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
63                            union lov_layout_state *state);
64         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
65                          union lov_layout_state *state);
66         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
67                             union lov_layout_state *state);
68         int  (*llo_print)(const struct lu_env *env, void *cookie,
69                           lu_printer_t p, const struct lu_object *o);
70         int  (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
71                               struct cl_page *page, pgoff_t index);
72         int  (*llo_lock_init)(const struct lu_env *env,
73                               struct cl_object *obj, struct cl_lock *lock,
74                               const struct cl_io *io);
75         int  (*llo_io_init)(const struct lu_env *env,
76                             struct cl_object *obj, struct cl_io *io);
77         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
78                             struct cl_attr *attr);
79 };
80
81 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
82
83 static void lov_lsm_put(struct lov_stripe_md *lsm)
84 {
85         if (lsm != NULL)
86                 lov_free_memmd(&lsm);
87 }
88
89 /*****************************************************************************
90  *
91  * Lov object layout operations.
92  *
93  */
94
95 static void lov_install_empty(const struct lu_env *env,
96                               struct lov_object *lov,
97                               union  lov_layout_state *state)
98 {
99         /*
100          * File without objects.
101          */
102 }
103
104 static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
105                           struct lov_object *lov, struct lov_stripe_md *lsm,
106                           const struct cl_object_conf *conf,
107                           union lov_layout_state *state)
108 {
109         return 0;
110 }
111
112 static void lov_install_raid0(const struct lu_env *env,
113                               struct lov_object *lov,
114                               union  lov_layout_state *state)
115 {
116 }
117
118 static struct cl_object *lov_sub_find(const struct lu_env *env,
119                                       struct cl_device *dev,
120                                       const struct lu_fid *fid,
121                                       const struct cl_object_conf *conf)
122 {
123         struct lu_object *o;
124
125         ENTRY;
126         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
127         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
128         RETURN(lu2cl(o));
129 }
130
131 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
132                         struct cl_object *stripe, struct lov_layout_raid0 *r0,
133                         int idx)
134 {
135         struct cl_object_header *hdr;
136         struct cl_object_header *subhdr;
137         struct cl_object_header *parent;
138         struct lov_oinfo        *oinfo;
139         int result;
140
141         if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
142                 /* For sanity:test_206.
143                  * Do not leave the object in cache to avoid accessing
144                  * freed memory. This is because osc_object is referring to
145                  * lov_oinfo of lsm_stripe_data which will be freed due to
146                  * this failure. */
147                 cl_object_kill(env, stripe);
148                 cl_object_put(env, stripe);
149                 return -EIO;
150         }
151
152         hdr    = cl_object_header(lov2cl(lov));
153         subhdr = cl_object_header(stripe);
154
155         oinfo = lov->lo_lsm->lsm_entries[0]->lsme_oinfo[idx];
156         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
157                " idx: %d gen: %d\n",
158                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
159                PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
160                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
161
162         /* reuse ->coh_attr_guard to protect coh_parent change */
163         spin_lock(&subhdr->coh_attr_guard);
164         parent = subhdr->coh_parent;
165         if (parent == NULL) {
166                 subhdr->coh_parent = hdr;
167                 spin_unlock(&subhdr->coh_attr_guard);
168                 subhdr->coh_nesting = hdr->coh_nesting + 1;
169                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
170                 r0->lo_sub[idx] = cl2lovsub(stripe);
171                 r0->lo_sub[idx]->lso_super = lov;
172                 r0->lo_sub[idx]->lso_index = idx;
173                 result = 0;
174         } else {
175                 struct lu_object  *old_obj;
176                 struct lov_object *old_lov;
177                 unsigned int mask = D_INODE;
178
179                 spin_unlock(&subhdr->coh_attr_guard);
180                 old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
181                 LASSERT(old_obj != NULL);
182                 old_lov = cl2lov(lu2cl(old_obj));
183                 if (old_lov->lo_layout_invalid) {
184                         /* the object's layout has already changed but isn't
185                          * refreshed */
186                         lu_object_unhash(env, &stripe->co_lu);
187                         result = -EAGAIN;
188                 } else {
189                         mask = D_ERROR;
190                         result = -EIO;
191                 }
192
193                 LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
194                                 "stripe %d is already owned.", idx);
195                 LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
196                 LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
197                 cl_object_put(env, stripe);
198         }
199         return result;
200 }
201
202 static int lov_page_slice_fixup(struct lov_object *lov,
203                                 struct cl_object *stripe)
204 {
205         struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
206         struct cl_object *o;
207
208         if (stripe == NULL)
209                 return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
210                        cfs_size_round(sizeof(struct lov_page));
211
212         cl_object_for_each(o, stripe)
213                 o->co_slice_off += hdr->coh_page_bufsize;
214
215         return cl_object_header(stripe)->coh_page_bufsize;
216 }
217
218 static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
219                           struct lov_object *lov, struct lov_stripe_md *lsm,
220                           const struct cl_object_conf *conf,
221                           union lov_layout_state *state)
222 {
223         int result;
224         int i;
225
226         struct cl_object        *stripe;
227         struct lov_thread_info  *lti     = lov_env_info(env);
228         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
229         struct lu_fid           *ofid    = &lti->lti_fid;
230         struct lov_layout_raid0 *r0      = &state->raid0;
231
232         ENTRY;
233
234         if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
235                 dump_lsm(D_ERROR, lsm);
236                 LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
237                          LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
238         }
239
240         LASSERT(lov->lo_lsm == NULL);
241         lov->lo_lsm = lsm_addref(lsm);
242         r0->lo_nr = lsm->lsm_entries[0]->lsme_stripe_count;
243         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
244
245         lov->lo_layout_invalid = true;
246
247         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
248         if (r0->lo_sub != NULL) {
249                 int psz = 0;
250
251                 result = 0;
252                 subconf->coc_inode = conf->coc_inode;
253                 spin_lock_init(&r0->lo_sub_lock);
254                 /*
255                  * Create stripe cl_objects.
256                  */
257                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
258                         struct cl_device *subdev;
259                         struct lov_oinfo *oinfo =
260                                         lsm->lsm_entries[0]->lsme_oinfo[i];
261                         int ost_idx = oinfo->loi_ost_idx;
262
263                         if (lov_oinfo_is_dummy(oinfo))
264                                 continue;
265
266                         result = ostid_to_fid(ofid, &oinfo->loi_oi,
267                                               oinfo->loi_ost_idx);
268                         if (result != 0)
269                                 GOTO(out, result);
270
271                         if (dev->ld_target[ost_idx] == NULL) {
272                                 CERROR("%s: OST %04x is not initialized\n",
273                                        lov2obd(dev->ld_lov)->obd_name, ost_idx);
274                                 GOTO(out, result = -EIO);
275                         }
276
277                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
278                         subconf->u.coc_oinfo = oinfo;
279                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
280                         /* In the function below, .hs_keycmp resolves to
281                          * lu_obj_hop_keycmp() */
282                         /* coverity[overrun-buffer-val] */
283                         stripe = lov_sub_find(env, subdev, ofid, subconf);
284                         if (!IS_ERR(stripe)) {
285                                 result = lov_init_sub(env, lov, stripe, r0, i);
286                                 if (result == -EAGAIN) { /* try again */
287                                         --i;
288                                         result = 0;
289                                         continue;
290                                 }
291                         } else {
292                                 result = PTR_ERR(stripe);
293                         }
294
295                         if (result == 0) {
296                                 int sz = lov_page_slice_fixup(lov, stripe);
297                                 LASSERT(ergo(psz > 0, psz == sz));
298                                 psz = sz;
299                         }
300                 }
301                 if (result == 0)
302                         cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
303         } else
304                 result = -ENOMEM;
305 out:
306         RETURN(result);
307 }
308
309 static int lov_init_released(const struct lu_env *env,
310                              struct lov_device *dev, struct lov_object *lov,
311                              struct lov_stripe_md *lsm,
312                              const struct cl_object_conf *conf,
313                              union lov_layout_state *state)
314 {
315         LASSERT(lsm != NULL);
316         LASSERT(lsm->lsm_is_released);
317         LASSERT(lov->lo_lsm == NULL);
318
319         lov->lo_lsm = lsm_addref(lsm);
320         return 0;
321 }
322
323 static struct cl_object *lov_find_subobj(const struct lu_env *env,
324                                          struct lov_object *lov,
325                                          struct lov_stripe_md *lsm,
326                                          int stripe_idx)
327 {
328         struct lov_device       *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
329         struct lov_oinfo *oinfo = lsm->lsm_entries[0]->lsme_oinfo[stripe_idx];
330         struct lov_thread_info  *lti = lov_env_info(env);
331         struct lu_fid           *ofid = &lti->lti_fid;
332         struct cl_device        *subdev;
333         int                     ost_idx;
334         int                     rc;
335         struct cl_object        *result;
336
337         if (lov->lo_type != LLT_RAID0)
338                 GOTO(out, result = NULL);
339
340         ost_idx = oinfo->loi_ost_idx;
341         rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
342         if (rc != 0)
343                 GOTO(out, result = NULL);
344
345         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
346         result = lov_sub_find(env, subdev, ofid, NULL);
347 out:
348         if (result == NULL)
349                 result = ERR_PTR(-EINVAL);
350         return result;
351 }
352
353 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
354                             union lov_layout_state *state)
355 {
356         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
357
358         lov_layout_wait(env, lov);
359         return 0;
360 }
361
362 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
363                                struct lovsub_object *los, int idx)
364 {
365         struct cl_object        *sub;
366         struct lov_layout_raid0 *r0;
367         struct lu_site          *site;
368         struct lu_site_bkt_data *bkt;
369         wait_queue_t          *waiter;
370
371         r0  = &lov->u.raid0;
372         LASSERT(r0->lo_sub[idx] == los);
373
374         sub  = lovsub2cl(los);
375         site = sub->co_lu.lo_dev->ld_site;
376         bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
377
378         cl_object_kill(env, sub);
379         /* release a reference to the sub-object and ... */
380         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
381         cl_object_put(env, sub);
382
383         /* ... wait until it is actually destroyed---sub-object clears its
384          * ->lo_sub[] slot in lovsub_object_fini() */
385         if (r0->lo_sub[idx] == los) {
386                 waiter = &lov_env_info(env)->lti_waiter;
387                 init_waitqueue_entry(waiter, current);
388                 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
389                 set_current_state(TASK_UNINTERRUPTIBLE);
390                 while (1) {
391                         /* this wait-queue is signaled at the end of
392                          * lu_object_free(). */
393                         set_current_state(TASK_UNINTERRUPTIBLE);
394                         spin_lock(&r0->lo_sub_lock);
395                         if (r0->lo_sub[idx] == los) {
396                                 spin_unlock(&r0->lo_sub_lock);
397                                 schedule();
398                         } else {
399                                 spin_unlock(&r0->lo_sub_lock);
400                                 set_current_state(TASK_RUNNING);
401                                 break;
402                         }
403                 }
404                 remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
405         }
406         LASSERT(r0->lo_sub[idx] == NULL);
407 }
408
409 static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
410                             union lov_layout_state *state)
411 {
412         struct lov_layout_raid0 *r0 = &state->raid0;
413         struct lov_stripe_md    *lsm = lov->lo_lsm;
414         int i;
415
416         ENTRY;
417
418         dump_lsm(D_INODE, lsm);
419
420         lov_layout_wait(env, lov);
421         if (r0->lo_sub != NULL) {
422                 for (i = 0; i < r0->lo_nr; ++i) {
423                         struct lovsub_object *los = r0->lo_sub[i];
424
425                         if (los != NULL) {
426                                 cl_object_prune(env, &los->lso_cl);
427                                 /*
428                                  * If top-level object is to be evicted from
429                                  * the cache, so are its sub-objects.
430                                  */
431                                 lov_subobject_kill(env, lov, los, i);
432                         }
433                 }
434         }
435         RETURN(0);
436 }
437
438 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
439                            union lov_layout_state *state)
440 {
441         LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
442 }
443
444 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
445                            union lov_layout_state *state)
446 {
447         struct lov_layout_raid0 *r0 = &state->raid0;
448         ENTRY;
449
450         if (r0->lo_sub != NULL) {
451                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
452                 r0->lo_sub = NULL;
453         }
454
455         dump_lsm(D_INODE, lov->lo_lsm);
456         lov_free_memmd(&lov->lo_lsm);
457
458         EXIT;
459 }
460
461 static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
462                                 union lov_layout_state *state)
463 {
464         ENTRY;
465         dump_lsm(D_INODE, lov->lo_lsm);
466         lov_free_memmd(&lov->lo_lsm);
467         EXIT;
468 }
469
470 static int lov_print_empty(const struct lu_env *env, void *cookie,
471                            lu_printer_t p, const struct lu_object *o)
472 {
473         (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
474         return 0;
475 }
476
477 static int lov_print_raid0(const struct lu_env *env, void *cookie,
478                            lu_printer_t p, const struct lu_object *o)
479 {
480         struct lov_object       *lov = lu2lov(o);
481         struct lov_layout_raid0 *r0  = lov_r0(lov);
482         struct lov_stripe_md    *lsm = lov->lo_lsm;
483         int                      i;
484
485         (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
486                 r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
487                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
488                 lsm->lsm_entries[0]->lsme_stripe_count, lsm->lsm_layout_gen);
489         for (i = 0; i < r0->lo_nr; ++i) {
490                 struct lu_object *sub;
491
492                 if (r0->lo_sub[i] != NULL) {
493                         sub = lovsub2lu(r0->lo_sub[i]);
494                         lu_object_print(env, cookie, p, sub);
495                 } else {
496                         (*p)(env, cookie, "sub %d absent\n", i);
497                 }
498         }
499         return 0;
500 }
501
502 static int lov_print_released(const struct lu_env *env, void *cookie,
503                                 lu_printer_t p, const struct lu_object *o)
504 {
505         struct lov_object       *lov = lu2lov(o);
506         struct lov_stripe_md    *lsm = lov->lo_lsm;
507
508         (*p)(env, cookie,
509                 "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
510                 lov->lo_layout_invalid ? "invalid" : "valid", lsm,
511                 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
512                 lsm->lsm_entries[0]->lsme_stripe_count, lsm->lsm_layout_gen);
513         return 0;
514 }
515
516 /**
517  * Implements cl_object_operations::coo_attr_get() method for an object
518  * without stripes (LLT_EMPTY layout type).
519  *
520  * The only attributes this layer is authoritative in this case is
521  * cl_attr::cat_blocks---it's 0.
522  */
523 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
524                               struct cl_attr *attr)
525 {
526         attr->cat_blocks = 0;
527         return 0;
528 }
529
530 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
531                               struct cl_attr *attr)
532 {
533         struct lov_object       *lov = cl2lov(obj);
534         struct lov_layout_raid0 *r0 = lov_r0(lov);
535         struct cl_attr          *lov_attr = &r0->lo_attr;
536         int                      result = 0;
537
538         ENTRY;
539
540         /* this is called w/o holding type guard mutex, so it must be inside
541          * an on going IO otherwise lsm may be replaced.
542          * LU-2117: it turns out there exists one exception. For mmaped files,
543          * the lock of those files may be requested in the other file's IO
544          * context, and this function is called in ccc_lock_state(), it will
545          * hit this assertion.
546          * Anyway, it's still okay to call attr_get w/o type guard as layout
547          * can't go if locks exist. */
548         /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
549
550         if (!r0->lo_attr_valid) {
551                 struct lov_stripe_md    *lsm = lov->lo_lsm;
552                 struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
553                 __u64                    kms = 0;
554
555                 memset(lvb, 0, sizeof(*lvb));
556                 /* XXX: timestamps can be negative by sanity:test_39m,
557                  * how can it be? */
558                 lvb->lvb_atime = LLONG_MIN;
559                 lvb->lvb_ctime = LLONG_MIN;
560                 lvb->lvb_mtime = LLONG_MIN;
561
562                 /*
563                  * XXX that should be replaced with a loop over sub-objects,
564                  * doing cl_object_attr_get() on them. But for now, let's
565                  * reuse old lov code.
566                  */
567
568                 /*
569                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
570                  * happy. It's not needed, because new code uses
571                  * ->coh_attr_guard spin-lock to protect consistency of
572                  * sub-object attributes.
573                  */
574                 lov_stripe_lock(lsm);
575                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
576                 lov_stripe_unlock(lsm);
577                 if (result == 0) {
578                         cl_lvb2attr(lov_attr, lvb);
579                         lov_attr->cat_kms = kms;
580                         r0->lo_attr_valid = 1;
581                 }
582         }
583         if (result == 0) { /* merge results */
584                 attr->cat_blocks = lov_attr->cat_blocks;
585                 attr->cat_size = lov_attr->cat_size;
586                 attr->cat_kms = lov_attr->cat_kms;
587                 if (attr->cat_atime < lov_attr->cat_atime)
588                         attr->cat_atime = lov_attr->cat_atime;
589                 if (attr->cat_ctime < lov_attr->cat_ctime)
590                         attr->cat_ctime = lov_attr->cat_ctime;
591                 if (attr->cat_mtime < lov_attr->cat_mtime)
592                         attr->cat_mtime = lov_attr->cat_mtime;
593         }
594         RETURN(result);
595 }
596
597 const static struct lov_layout_operations lov_dispatch[] = {
598         [LLT_EMPTY] = {
599                 .llo_init      = lov_init_empty,
600                 .llo_delete    = lov_delete_empty,
601                 .llo_fini      = lov_fini_empty,
602                 .llo_install   = lov_install_empty,
603                 .llo_print     = lov_print_empty,
604                 .llo_page_init = lov_page_init_empty,
605                 .llo_lock_init = lov_lock_init_empty,
606                 .llo_io_init   = lov_io_init_empty,
607                 .llo_getattr   = lov_attr_get_empty,
608         },
609         [LLT_RAID0] = {
610                 .llo_init      = lov_init_raid0,
611                 .llo_delete    = lov_delete_raid0,
612                 .llo_fini      = lov_fini_raid0,
613                 .llo_install   = lov_install_raid0,
614                 .llo_print     = lov_print_raid0,
615                 .llo_page_init = lov_page_init_raid0,
616                 .llo_lock_init = lov_lock_init_raid0,
617                 .llo_io_init   = lov_io_init_raid0,
618                 .llo_getattr   = lov_attr_get_raid0,
619         },
620         [LLT_RELEASED] = {
621                 .llo_init      = lov_init_released,
622                 .llo_delete    = lov_delete_empty,
623                 .llo_fini      = lov_fini_released,
624                 .llo_install   = lov_install_empty,
625                 .llo_print     = lov_print_released,
626                 .llo_page_init = lov_page_init_empty,
627                 .llo_lock_init = lov_lock_init_empty,
628                 .llo_io_init   = lov_io_init_released,
629                 .llo_getattr   = lov_attr_get_empty,
630         }
631 };
632
633 /**
634  * Performs a double-dispatch based on the layout type of an object.
635  */
636 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)              \
637 ({                                                      \
638         struct lov_object *__obj = (obj);               \
639         enum lov_layout_type __llt;                     \
640                                                         \
641         __llt = __obj->lo_type;                         \
642         LASSERT(__llt < ARRAY_SIZE(lov_dispatch));      \
643         lov_dispatch[__llt].op(__VA_ARGS__);            \
644 })
645
646 /**
647  * Return lov_layout_type associated with a given lsm
648  */
649 static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
650 {
651         if (lsm == NULL)
652                 return LLT_EMPTY;
653
654         if (lsm->lsm_magic == LOV_MAGIC_COMP_V1)
655                 return LLT_EMPTY;
656
657         if (lsm->lsm_is_released)
658                 return LLT_RELEASED;
659
660         return LLT_RAID0;
661 }
662
663 static inline void lov_conf_freeze(struct lov_object *lov)
664 {
665         CDEBUG(D_INODE, "To take share lov(%p) owner %p/%p\n",
666                 lov, lov->lo_owner, current);
667         if (lov->lo_owner != current)
668                 down_read(&lov->lo_type_guard);
669 }
670
671 static inline void lov_conf_thaw(struct lov_object *lov)
672 {
673         CDEBUG(D_INODE, "To release share lov(%p) owner %p/%p\n",
674                 lov, lov->lo_owner, current);
675         if (lov->lo_owner != current)
676                 up_read(&lov->lo_type_guard);
677 }
678
679 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
680 ({                                                                      \
681         struct lov_object                      *__obj = (obj);          \
682         int                                     __lock = !!(lock);      \
683         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
684                                                                         \
685         if (__lock)                                                     \
686                 lov_conf_freeze(__obj);                                 \
687         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
688         if (__lock)                                                     \
689                 lov_conf_thaw(__obj);                                   \
690         __result;                                                       \
691 })
692
693 /**
694  * Performs a locked double-dispatch based on the layout type of an object.
695  */
696 #define LOV_2DISPATCH(obj, op, ...)                     \
697         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
698
699 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
700 do {                                                                    \
701         struct lov_object                      *__obj = (obj);          \
702         enum lov_layout_type                    __llt;                  \
703                                                                         \
704         lov_conf_freeze(__obj);                                         \
705         __llt = __obj->lo_type;                                         \
706         LASSERT(__llt < ARRAY_SIZE(lov_dispatch));                      \
707         lov_dispatch[__llt].op(__VA_ARGS__);                            \
708         lov_conf_thaw(__obj);                                           \
709 } while (0)
710
711 static void lov_conf_lock(struct lov_object *lov)
712 {
713         LASSERT(lov->lo_owner != current);
714         down_write(&lov->lo_type_guard);
715         LASSERT(lov->lo_owner == NULL);
716         lov->lo_owner = current;
717         CDEBUG(D_INODE, "Took exclusive lov(%p) owner %p\n",
718                 lov, lov->lo_owner);
719 }
720
721 static void lov_conf_unlock(struct lov_object *lov)
722 {
723         CDEBUG(D_INODE, "To release exclusive lov(%p) owner %p\n",
724                 lov, lov->lo_owner);
725         lov->lo_owner = NULL;
726         up_write(&lov->lo_type_guard);
727 }
728
729 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
730 {
731         struct l_wait_info lwi = { 0 };
732         ENTRY;
733
734         while (atomic_read(&lov->lo_active_ios) > 0) {
735                 CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
736                         PFID(lu_object_fid(lov2lu(lov))),
737                         atomic_read(&lov->lo_active_ios));
738
739                 l_wait_event(lov->lo_waitq,
740                              atomic_read(&lov->lo_active_ios) == 0, &lwi);
741         }
742         RETURN(0);
743 }
744
745 static int lov_layout_change(const struct lu_env *unused,
746                              struct lov_object *lov, struct lov_stripe_md *lsm,
747                              const struct cl_object_conf *conf)
748 {
749         enum lov_layout_type llt = lov_type(lsm);
750         union lov_layout_state *state = &lov->u;
751         const struct lov_layout_operations *old_ops;
752         const struct lov_layout_operations *new_ops;
753         struct lov_device *lov_dev = lov_object_dev(lov);
754         struct lu_env *env;
755         __u16 refcheck;
756         int rc;
757         ENTRY;
758
759         LASSERT(lov->lo_type < ARRAY_SIZE(lov_dispatch));
760
761         env = cl_env_get(&refcheck);
762         if (IS_ERR(env))
763                 RETURN(PTR_ERR(env));
764
765         LASSERT(llt < ARRAY_SIZE(lov_dispatch));
766
767         CDEBUG(D_INODE, DFID" from %s to %s\n",
768                PFID(lu_object_fid(lov2lu(lov))),
769                llt2str(lov->lo_type), llt2str(llt));
770
771         old_ops = &lov_dispatch[lov->lo_type];
772         new_ops = &lov_dispatch[llt];
773
774         rc = cl_object_prune(env, &lov->lo_cl);
775         if (rc != 0)
776                 GOTO(out, rc);
777
778         rc = old_ops->llo_delete(env, lov, &lov->u);
779         if (rc != 0)
780                 GOTO(out, rc);
781
782         old_ops->llo_fini(env, lov, &lov->u);
783
784         LASSERT(atomic_read(&lov->lo_active_ios) == 0);
785
786         CDEBUG(D_INODE, DFID "Apply new layout lov %p, type %d\n",
787                PFID(lu_object_fid(lov2lu(lov))), lov, llt);
788
789         lov->lo_type = LLT_EMPTY;
790
791         /* page bufsize fixup */
792         cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
793                 lov_page_slice_fixup(lov, NULL);
794
795         rc = new_ops->llo_init(env, lov_dev, lov, lsm, conf, state);
796         if (rc != 0) {
797                 struct obd_device *obd = lov2obd(lov_dev->ld_lov);
798
799                 CERROR("%s: cannot apply new layout on "DFID" : rc = %d\n",
800                        obd->obd_name, PFID(lu_object_fid(lov2lu(lov))), rc);
801                 new_ops->llo_delete(env, lov, state);
802                 new_ops->llo_fini(env, lov, state);
803                 /* this file becomes an EMPTY file. */
804                 GOTO(out, rc);
805         }
806
807         new_ops->llo_install(env, lov, state);
808         lov->lo_type = llt;
809
810 out:
811         cl_env_put(env, &refcheck);
812         RETURN(rc);
813 }
814
815 /*****************************************************************************
816  *
817  * Lov object operations.
818  *
819  */
820 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
821                     const struct lu_object_conf *conf)
822 {
823         struct lov_object            *lov   = lu2lov(obj);
824         struct lov_device            *dev   = lov_object_dev(lov);
825         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
826         union lov_layout_state       *set   = &lov->u;
827         const struct lov_layout_operations *ops;
828         struct lov_stripe_md *lsm = NULL;
829         int rc;
830         ENTRY;
831
832         init_rwsem(&lov->lo_type_guard);
833         atomic_set(&lov->lo_active_ios, 0);
834         init_waitqueue_head(&lov->lo_waitq);
835         cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
836
837         lov->lo_type = LLT_EMPTY;
838         if (cconf->u.coc_layout.lb_buf != NULL) {
839                 lsm = lov_unpackmd(dev->ld_lov,
840                                    cconf->u.coc_layout.lb_buf,
841                                    cconf->u.coc_layout.lb_len);
842                 if (IS_ERR(lsm))
843                         RETURN(PTR_ERR(lsm));
844         }
845
846         /* no locking is necessary, as object is being created */
847         lov->lo_type = lov_type(lsm);
848         ops = &lov_dispatch[lov->lo_type];
849         rc = ops->llo_init(env, dev, lov, lsm, cconf, set);
850         if (rc != 0)
851                 GOTO(out_lsm, rc);
852
853         ops->llo_install(env, lov, set);
854
855 out_lsm:
856         lov_lsm_put(lsm);
857
858         RETURN(rc);
859 }
860
861 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
862                         const struct cl_object_conf *conf)
863 {
864         struct lov_stripe_md    *lsm = NULL;
865         struct lov_object       *lov = cl2lov(obj);
866         int                      result = 0;
867         ENTRY;
868
869         if (conf->coc_opc == OBJECT_CONF_SET &&
870             conf->u.coc_layout.lb_buf != NULL) {
871                 lsm = lov_unpackmd(lov_object_dev(lov)->ld_lov,
872                                    conf->u.coc_layout.lb_buf,
873                                    conf->u.coc_layout.lb_len);
874                 if (IS_ERR(lsm))
875                         RETURN(PTR_ERR(lsm));
876         }
877
878         lov_conf_lock(lov);
879         if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
880                 lov->lo_layout_invalid = true;
881                 GOTO(out, result = 0);
882         }
883
884         if (conf->coc_opc == OBJECT_CONF_WAIT) {
885                 if (lov->lo_layout_invalid &&
886                     atomic_read(&lov->lo_active_ios) > 0) {
887                         lov_conf_unlock(lov);
888                         result = lov_layout_wait(env, lov);
889                         lov_conf_lock(lov);
890                 }
891                 GOTO(out, result);
892         }
893
894         LASSERT(conf->coc_opc == OBJECT_CONF_SET);
895
896         if ((lsm == NULL && lov->lo_lsm == NULL) ||
897             ((lsm != NULL && lov->lo_lsm != NULL) &&
898              (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
899              (lov->lo_lsm->lsm_entries[0]->lsme_pattern ==
900               lsm->lsm_entries[0]->lsme_pattern))) {
901                 /* same version of layout */
902                 lov->lo_layout_invalid = false;
903                 GOTO(out, result = 0);
904         }
905
906         /* will change layout - check if there still exists active IO. */
907         if (atomic_read(&lov->lo_active_ios) > 0) {
908                 lov->lo_layout_invalid = true;
909                 GOTO(out, result = -EBUSY);
910         }
911
912         result = lov_layout_change(env, lov, lsm, conf);
913         lov->lo_layout_invalid = result != 0;
914         EXIT;
915
916 out:
917         lov_conf_unlock(lov);
918         lov_lsm_put(lsm);
919         CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
920                PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
921         RETURN(result);
922 }
923
924 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
925 {
926         struct lov_object *lov = lu2lov(obj);
927
928         ENTRY;
929         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
930         EXIT;
931 }
932
933 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
934 {
935         struct lov_object *lov = lu2lov(obj);
936
937         ENTRY;
938         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
939         lu_object_fini(obj);
940         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
941         EXIT;
942 }
943
944 static int lov_object_print(const struct lu_env *env, void *cookie,
945                             lu_printer_t p, const struct lu_object *o)
946 {
947         return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
948 }
949
950 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
951                   struct cl_page *page, pgoff_t index)
952 {
953         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
954                                     index);
955 }
956
957 /**
958  * Implements cl_object_operations::clo_io_init() method for lov
959  * layer. Dispatches to the appropriate layout io initialization method.
960  */
961 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
962                 struct cl_io *io)
963 {
964         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
965
966         CDEBUG(D_INODE, DFID "io %p type %d ignore/verify layout %d/%d\n",
967                PFID(lu_object_fid(&obj->co_lu)), io, io->ci_type,
968                io->ci_ignore_layout, io->ci_verify_layout);
969
970         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
971                                      !io->ci_ignore_layout, env, obj, io);
972 }
973
974 /**
975  * An implementation of cl_object_operations::clo_attr_get() method for lov
976  * layer. For raid0 layout this collects and merges attributes of all
977  * sub-objects.
978  */
979 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
980                         struct cl_attr *attr)
981 {
982         /* do not take lock, as this function is called under a
983          * spin-lock. Layout is protected from changing by ongoing IO. */
984         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
985 }
986
987 static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
988                            const struct cl_attr *attr, unsigned valid)
989 {
990         /*
991          * No dispatch is required here, as no layout implements this.
992          */
993         return 0;
994 }
995
996 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
997                   struct cl_lock *lock, const struct cl_io *io)
998 {
999         /* No need to lock because we've taken one refcount of layout.  */
1000         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock,
1001                                     io);
1002 }
1003
1004 /**
1005  * We calculate on which OST the mapping will end. If the length of mapping
1006  * is greater than (stripe_size * stripe_count) then the last_stripe will
1007  * will be one just before start_stripe. Else we check if the mapping
1008  * intersects each OST and find last_stripe.
1009  * This function returns the last_stripe and also sets the stripe_count
1010  * over which the mapping is spread
1011  *
1012  * \param lsm [in]              striping information for the file
1013  * \param fm_start [in]         logical start of mapping
1014  * \param fm_end [in]           logical end of mapping
1015  * \param start_stripe [in]     starting stripe of the mapping
1016  * \param stripe_count [out]    the number of stripes across which to map is
1017  *                              returned
1018  *
1019  * \retval last_stripe          return the last stripe of the mapping
1020  */
1021 static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm,
1022                                    u64 fm_start, u64 fm_end,
1023                                    int start_stripe, int *stripe_count)
1024 {
1025         int last_stripe;
1026         u64 obd_start;
1027         u64 obd_end;
1028         int i, j;
1029
1030         if (fm_end - fm_start > lsm->lsm_entries[0]->lsme_stripe_size *
1031                                 lsm->lsm_entries[0]->lsme_stripe_count) {
1032                 last_stripe = (start_stripe < 1 ?
1033                                lsm->lsm_entries[0]->lsme_stripe_count - 1 :
1034                                start_stripe - 1);
1035                 *stripe_count = lsm->lsm_entries[0]->lsme_stripe_count;
1036         } else {
1037                 for (j = 0, i = start_stripe;
1038                      j < lsm->lsm_entries[0]->lsme_stripe_count;
1039                      i = (i + 1) % lsm->lsm_entries[0]->lsme_stripe_count,
1040                      j++) {
1041                         if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
1042                                                    &obd_start, &obd_end)) == 0)
1043                                 break;
1044                 }
1045                 *stripe_count = j;
1046                 last_stripe = (start_stripe + j - 1) %
1047                               lsm->lsm_entries[0]->lsme_stripe_count;
1048         }
1049
1050         return last_stripe;
1051 }
1052
1053 /**
1054  * Set fe_device and copy extents from local buffer into main return buffer.
1055  *
1056  * \param fiemap [out]          fiemap to hold all extents
1057  * \param lcl_fm_ext [in]       array of fiemap extents get from OSC layer
1058  * \param ost_index [in]        OST index to be written into the fm_device
1059  *                              field for each extent
1060  * \param ext_count [in]        number of extents to be copied
1061  * \param current_extent [in]   where to start copying in the extent array
1062  */
1063 static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
1064                                          struct fiemap_extent *lcl_fm_ext,
1065                                          int ost_index, unsigned int ext_count,
1066                                          int current_extent)
1067 {
1068         char            *to;
1069         unsigned int    ext;
1070
1071         for (ext = 0; ext < ext_count; ext++) {
1072                 lcl_fm_ext[ext].fe_device = ost_index;
1073                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1074         }
1075
1076         /* Copy fm_extent's from fm_local to return buffer */
1077         to = (char *)fiemap + fiemap_count_to_size(current_extent);
1078         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct fiemap_extent));
1079 }
1080
1081 #define FIEMAP_BUFFER_SIZE 4096
1082
1083 /**
1084  * Non-zero fe_logical indicates that this is a continuation FIEMAP
1085  * call. The local end offset and the device are sent in the first
1086  * fm_extent. This function calculates the stripe number from the index.
1087  * This function returns a stripe_no on which mapping is to be restarted.
1088  *
1089  * This function returns fm_end_offset which is the in-OST offset at which
1090  * mapping should be restarted. If fm_end_offset=0 is returned then caller
1091  * will re-calculate proper offset in next stripe.
1092  * Note that the first extent is passed to lov_get_info via the value field.
1093  *
1094  * \param fiemap [in]           fiemap request header
1095  * \param lsm [in]              striping information for the file
1096  * \param fm_start [in]         logical start of mapping
1097  * \param fm_end [in]           logical end of mapping
1098  * \param start_stripe [out]    starting stripe will be returned in this
1099  */
1100 static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap,
1101                                      struct lov_stripe_md *lsm,
1102                                      u64 fm_start, u64 fm_end,
1103                                      int *start_stripe)
1104 {
1105         u64 local_end = fiemap->fm_extents[0].fe_logical;
1106         u64 lun_start;
1107         u64 lun_end;
1108         u64 fm_end_offset;
1109         int stripe_no = -1;
1110         int i;
1111
1112         if (fiemap->fm_extent_count == 0 ||
1113             fiemap->fm_extents[0].fe_logical == 0)
1114                 return 0;
1115
1116         /* Find out stripe_no from ost_index saved in the fe_device */
1117         for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) {
1118                 struct lov_oinfo *oinfo = lsm->lsm_entries[0]->lsme_oinfo[i];
1119
1120                 if (lov_oinfo_is_dummy(oinfo))
1121                         continue;
1122
1123                 if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1124                         stripe_no = i;
1125                         break;
1126                 }
1127         }
1128
1129         if (stripe_no == -1)
1130                 return -EINVAL;
1131
1132         /* If we have finished mapping on previous device, shift logical
1133          * offset to start of next device */
1134         if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
1135                                   &lun_start, &lun_end) != 0 &&
1136             local_end < lun_end) {
1137                 fm_end_offset = local_end;
1138                 *start_stripe = stripe_no;
1139         } else {
1140                 /* This is a special value to indicate that caller should
1141                  * calculate offset in next stripe. */
1142                 fm_end_offset = 0;
1143                 *start_stripe = (stripe_no + 1) %
1144                                 lsm->lsm_entries[0]->lsme_stripe_count;
1145         }
1146
1147         return fm_end_offset;
1148 }
1149
1150 struct fiemap_state {
1151         struct fiemap   *fs_fm;
1152         u64             fs_start;
1153         u64             fs_length;
1154         u64             fs_end;
1155         u64             fs_end_offset;
1156         int             fs_cur_extent;
1157         int             fs_cnt_need;
1158         int             fs_start_stripe;
1159         int             fs_last_stripe;
1160         bool            fs_device_done;
1161         bool            fs_finish;
1162         bool            fs_enough;
1163 };
1164
1165 int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
1166                       struct lov_stripe_md *lsm,
1167                       struct fiemap *fiemap, size_t *buflen,
1168                       struct ll_fiemap_info_key *fmkey, int stripeno,
1169                       struct fiemap_state *fs)
1170 {
1171         struct cl_object *subobj;
1172         struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
1173         struct fiemap_extent *fm_ext = &fs->fs_fm->fm_extents[0];
1174         u64 req_fm_len; /* Stores length of required mapping */
1175         u64 len_mapped_single_call;
1176         u64 lun_start;
1177         u64 lun_end;
1178         u64 obd_object_end;
1179         unsigned int ext_count;
1180         /* EOF for object */
1181         bool ost_eof = false;
1182         /* done with required mapping for this OST? */
1183         bool ost_done = false;
1184         int ost_index;
1185         int rc = 0;
1186
1187         fs->fs_device_done = false;
1188         /* Find out range of mapping on this stripe */
1189         if ((lov_stripe_intersects(lsm, stripeno, fs->fs_start, fs->fs_end,
1190                                    &lun_start, &obd_object_end)) == 0)
1191                 return 0;
1192
1193         if (lov_oinfo_is_dummy(lsm->lsm_entries[0]->lsme_oinfo[stripeno]))
1194                 return -EIO;
1195
1196         /* If this is a continuation FIEMAP call and we are on
1197          * starting stripe then lun_start needs to be set to
1198          * end_offset */
1199         if (fs->fs_end_offset != 0 && stripeno == fs->fs_start_stripe)
1200                 lun_start = fs->fs_end_offset;
1201
1202         lun_end = fs->fs_length;
1203         if (lun_end != ~0ULL) {
1204                 /* Handle fs->fs_start + fs->fs_length overflow */
1205                 if (fs->fs_start + fs->fs_length < fs->fs_start)
1206                         fs->fs_length = ~0ULL - fs->fs_start;
1207                 lun_end = lov_size_to_stripe(lsm, fs->fs_start + fs->fs_length,
1208                                              stripeno);
1209         }
1210
1211         if (lun_start == lun_end)
1212                 return 0;
1213
1214         req_fm_len = obd_object_end - lun_start;
1215         fs->fs_fm->fm_length = 0;
1216         len_mapped_single_call = 0;
1217
1218         /* find lobsub object */
1219         subobj = lov_find_subobj(env, cl2lov(obj), lsm, stripeno);
1220         if (IS_ERR(subobj))
1221                 return PTR_ERR(subobj);
1222         /* If the output buffer is very large and the objects have many
1223          * extents we may need to loop on a single OST repeatedly */
1224         do {
1225                 if (fiemap->fm_extent_count > 0) {
1226                         /* Don't get too many extents. */
1227                         if (fs->fs_cur_extent + fs->fs_cnt_need >
1228                             fiemap->fm_extent_count)
1229                                 fs->fs_cnt_need = fiemap->fm_extent_count -
1230                                                   fs->fs_cur_extent;
1231                 }
1232
1233                 lun_start += len_mapped_single_call;
1234                 fs->fs_fm->fm_length = req_fm_len - len_mapped_single_call;
1235                 req_fm_len = fs->fs_fm->fm_length;
1236                 fs->fs_fm->fm_extent_count = fs->fs_enough ?
1237                                              1 : fs->fs_cnt_need;
1238                 fs->fs_fm->fm_mapped_extents = 0;
1239                 fs->fs_fm->fm_flags = fiemap->fm_flags;
1240
1241                 ost_index = lsm->lsm_entries[0]->lsme_oinfo[stripeno]->
1242                                                                 loi_ost_idx;
1243
1244                 if (ost_index < 0 || ost_index >= lov->desc.ld_tgt_count)
1245                         GOTO(obj_put, rc = -EINVAL);
1246                 /* If OST is inactive, return extent with UNKNOWN flag. */
1247                 if (!lov->lov_tgts[ost_index]->ltd_active) {
1248                         fs->fs_fm->fm_flags |= FIEMAP_EXTENT_LAST;
1249                         fs->fs_fm->fm_mapped_extents = 1;
1250
1251                         fm_ext[0].fe_logical = lun_start;
1252                         fm_ext[0].fe_length = obd_object_end - lun_start;
1253                         fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1254
1255                         goto inactive_tgt;
1256                 }
1257
1258                 fs->fs_fm->fm_start = lun_start;
1259                 fs->fs_fm->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1260                 memcpy(&fmkey->lfik_fiemap, fs->fs_fm, sizeof(*fs->fs_fm));
1261                 *buflen = fiemap_count_to_size(fs->fs_fm->fm_extent_count);
1262
1263                 rc = cl_object_fiemap(env, subobj, fmkey, fs->fs_fm, buflen);
1264                 if (rc != 0)
1265                         GOTO(obj_put, rc);
1266 inactive_tgt:
1267                 ext_count = fs->fs_fm->fm_mapped_extents;
1268                 if (ext_count == 0) {
1269                         ost_done = true;
1270                         fs->fs_device_done = true;
1271                         /* If last stripe has hold at the end,
1272                          * we need to return */
1273                         if (stripeno == fs->fs_last_stripe) {
1274                                 fiemap->fm_mapped_extents = 0;
1275                                 fs->fs_finish = true;
1276                                 GOTO(obj_put, rc);
1277                         }
1278                         break;
1279                 } else if (fs->fs_enough) {
1280                         /*
1281                          * We've collected enough extents and there are
1282                          * more extents after it.
1283                          */
1284                         fs->fs_finish = true;
1285                         GOTO(obj_put, rc);
1286                 }
1287
1288                 /* If we just need num of extents, got to next device */
1289                 if (fiemap->fm_extent_count == 0) {
1290                         fs->fs_cur_extent += ext_count;
1291                         break;
1292                 }
1293
1294                 /* prepare to copy retrived map extents */
1295                 len_mapped_single_call = fm_ext[ext_count - 1].fe_logical +
1296                                          fm_ext[ext_count - 1].fe_length -
1297                                          lun_start;
1298
1299                 /* Have we finished mapping on this device? */
1300                 if (req_fm_len <= len_mapped_single_call) {
1301                         ost_done = true;
1302                         fs->fs_device_done = true;
1303                 }
1304
1305                 /* Clear the EXTENT_LAST flag which can be present on
1306                  * the last extent */
1307                 if (fm_ext[ext_count - 1].fe_flags & FIEMAP_EXTENT_LAST)
1308                         fm_ext[ext_count - 1].fe_flags &= ~FIEMAP_EXTENT_LAST;
1309                 if (lov_stripe_size(lsm, fm_ext[ext_count - 1].fe_logical +
1310                                          fm_ext[ext_count - 1].fe_length,
1311                                     stripeno) >= fmkey->lfik_oa.o_size) {
1312                         ost_eof = true;
1313                         fs->fs_device_done = true;
1314                 }
1315
1316                 fiemap_prepare_and_copy_exts(fiemap, fm_ext, ost_index,
1317                                              ext_count, fs->fs_cur_extent);
1318                 fs->fs_cur_extent += ext_count;
1319
1320                 /* Ran out of available extents? */
1321                 if (fs->fs_cur_extent >= fiemap->fm_extent_count)
1322                         fs->fs_enough = true;
1323         } while (!ost_done && !ost_eof);
1324
1325         if (stripeno == fs->fs_last_stripe)
1326                 fs->fs_finish = true;
1327 obj_put:
1328         cl_object_put(env, subobj);
1329
1330         return rc;
1331 }
1332
1333 /**
1334  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1335  * This also handles the restarting of FIEMAP calls in case mapping overflows
1336  * the available number of extents in single call.
1337  *
1338  * \param env [in]              lustre environment
1339  * \param obj [in]              file object
1340  * \param fmkey [in]            fiemap request header and other info
1341  * \param fiemap [out]          fiemap buffer holding retrived map extents
1342  * \param buflen [in/out]       max buffer length of @fiemap, when iterate
1343  *                              each OST, it is used to limit max map needed
1344  * \retval 0    success
1345  * \retval < 0  error
1346  */
1347 static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
1348                              struct ll_fiemap_info_key *fmkey,
1349                              struct fiemap *fiemap, size_t *buflen)
1350 {
1351         struct lov_stripe_md    *lsm;
1352         struct fiemap           *fm_local = NULL;
1353         int                     cur_stripe;
1354         int                     stripe_count;
1355         unsigned int            buffer_size = FIEMAP_BUFFER_SIZE;
1356         int                     rc = 0;
1357         struct fiemap_state fs = { 0 };
1358         ENTRY;
1359
1360         lsm = lov_lsm_addref(cl2lov(obj));
1361         if (lsm == NULL)
1362                 RETURN(-ENODATA);
1363
1364         /**
1365          * If the stripe_count > 1 and the application does not understand
1366          * DEVICE_ORDER flag, it cannot interpret the extents correctly.
1367          */
1368         if (lsm->lsm_entries[0]->lsme_stripe_count > 1 &&
1369             !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1370                 GOTO(out_lsm, rc = -ENOTSUPP);
1371
1372         if (lsm->lsm_is_released) {
1373                 if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
1374                         /**
1375                          * released file, return a minimal FIEMAP if
1376                          * request fits in file-size.
1377                          */
1378                         fiemap->fm_mapped_extents = 1;
1379                         fiemap->fm_extents[0].fe_logical = fiemap->fm_start;
1380                         if (fiemap->fm_start + fiemap->fm_length <
1381                             fmkey->lfik_oa.o_size)
1382                                 fiemap->fm_extents[0].fe_length =
1383                                         fiemap->fm_length;
1384                         else
1385                                 fiemap->fm_extents[0].fe_length =
1386                                         fmkey->lfik_oa.o_size -
1387                                         fiemap->fm_start;
1388                         fiemap->fm_extents[0].fe_flags |=
1389                                 FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST;
1390                 }
1391                 GOTO(out_lsm, rc = 0);
1392         }
1393
1394         if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
1395                 buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
1396
1397         OBD_ALLOC_LARGE(fm_local, buffer_size);
1398         if (fm_local == NULL)
1399                 GOTO(out_lsm, rc = -ENOMEM);
1400
1401         fs.fs_fm = fm_local;
1402         fs.fs_cnt_need = fiemap_size_to_count(buffer_size);
1403
1404         fs.fs_start = fiemap->fm_start;
1405         /* fs.fs_start is beyond the end of the file */
1406         if (fs.fs_start > fmkey->lfik_oa.o_size)
1407                 GOTO(out_fm_local, rc = -EINVAL);
1408
1409         fs.fs_length = fiemap->fm_length;
1410         /* Calculate start stripe, last stripe and length of mapping */
1411         fs.fs_start_stripe = lov_stripe_number(lsm, fs.fs_start);
1412         fs.fs_end = (fs.fs_length == ~0ULL) ? fmkey->lfik_oa.o_size :
1413                                               fs.fs_start + fs.fs_length - 1;
1414         /* If fs_length != ~0ULL but fs_start+fs_length-1 exceeds file size */
1415         if (fs.fs_end > fmkey->lfik_oa.o_size) {
1416                 fs.fs_end = fmkey->lfik_oa.o_size;
1417                 fs.fs_length = fs.fs_end - fs.fs_start;
1418         }
1419
1420         fs.fs_last_stripe = fiemap_calc_last_stripe(lsm, fs.fs_start, fs.fs_end,
1421                                                     fs.fs_start_stripe,
1422                                                     &stripe_count);
1423         fs.fs_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fs.fs_start,
1424                                                      fs.fs_end,
1425                                                      &fs.fs_start_stripe);
1426         if (fs.fs_end_offset == -EINVAL)
1427                 GOTO(out_fm_local, rc = -EINVAL);
1428
1429         /**
1430          * Requested extent count exceeds the fiemap buffer size, shrink our
1431          * ambition.
1432          */
1433         if (fiemap_count_to_size(fiemap->fm_extent_count) > *buflen)
1434                 fiemap->fm_extent_count = fiemap_size_to_count(*buflen);
1435         if (fiemap->fm_extent_count == 0)
1436                 fs.fs_cnt_need = 0;
1437
1438         fs.fs_finish = false;
1439         fs.fs_enough = false;
1440         fs.fs_cur_extent = 0;
1441
1442         /* Check each stripe */
1443         for (cur_stripe = fs.fs_start_stripe; stripe_count > 0;
1444              --stripe_count,
1445              cur_stripe = (cur_stripe + 1) %
1446                            lsm->lsm_entries[0]->lsme_stripe_count) {
1447                 rc = fiemap_for_stripe(env, obj, lsm, fiemap, buflen, fmkey,
1448                                        cur_stripe, &fs);
1449                 if (rc < 0)
1450                         GOTO(out_fm_local, rc);
1451                 if (fs.fs_finish)
1452                         break;
1453         } /* for each stripe */
1454
1455         /* Indicate that we are returning device offsets unless file just has
1456          * single stripe */
1457         if (lsm->lsm_entries[0]->lsme_stripe_count > 1)
1458                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1459
1460         if (fiemap->fm_extent_count == 0)
1461                 goto skip_last_device_calc;
1462
1463         /* Check if we have reached the last stripe and whether mapping for that
1464          * stripe is done. */
1465         if ((cur_stripe == fs.fs_last_stripe) && fs.fs_device_done)
1466                 fiemap->fm_extents[fs.fs_cur_extent - 1].fe_flags |=
1467                                                              FIEMAP_EXTENT_LAST;
1468 skip_last_device_calc:
1469         fiemap->fm_mapped_extents = fs.fs_cur_extent;
1470 out_fm_local:
1471         OBD_FREE_LARGE(fm_local, buffer_size);
1472
1473 out_lsm:
1474         lov_lsm_put(lsm);
1475
1476         return rc;
1477 }
1478
1479 static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
1480                                 struct lov_user_md __user *lum)
1481 {
1482         struct lov_object       *lov = cl2lov(obj);
1483         struct lov_stripe_md    *lsm;
1484         int                     rc = 0;
1485         ENTRY;
1486
1487         lsm = lov_lsm_addref(lov);
1488         if (lsm == NULL)
1489                 RETURN(-ENODATA);
1490
1491         rc = lov_getstripe(cl2lov(obj), lsm, lum);
1492         lov_lsm_put(lsm);
1493         RETURN(rc);
1494 }
1495
1496 static int lov_object_layout_get(const struct lu_env *env,
1497                                  struct cl_object *obj,
1498                                  struct cl_layout *cl)
1499 {
1500         struct lov_object *lov = cl2lov(obj);
1501         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1502         struct lu_buf *buf = &cl->cl_buf;
1503         ssize_t rc;
1504         ENTRY;
1505
1506         if (lsm == NULL) {
1507                 cl->cl_size = 0;
1508                 cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
1509
1510                 RETURN(0);
1511         }
1512
1513         cl->cl_size = lov_mds_md_size(lsm->lsm_entries[0]->lsme_stripe_count,
1514                                       lsm->lsm_magic);
1515         cl->cl_layout_gen = lsm->lsm_layout_gen;
1516
1517         rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
1518         lov_lsm_put(lsm);
1519
1520         RETURN(rc < 0 ? rc : 0);
1521 }
1522
1523 static loff_t lov_object_maxbytes(struct cl_object *obj)
1524 {
1525         struct lov_object *lov = cl2lov(obj);
1526         struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1527         loff_t maxbytes;
1528
1529         if (lsm == NULL)
1530                 return LLONG_MAX;
1531
1532         maxbytes = lsm->lsm_maxbytes;
1533
1534         lov_lsm_put(lsm);
1535
1536         return maxbytes;
1537 }
1538
1539 static const struct cl_object_operations lov_ops = {
1540         .coo_page_init    = lov_page_init,
1541         .coo_lock_init    = lov_lock_init,
1542         .coo_io_init      = lov_io_init,
1543         .coo_attr_get     = lov_attr_get,
1544         .coo_attr_update  = lov_attr_update,
1545         .coo_conf_set     = lov_conf_set,
1546         .coo_getstripe    = lov_object_getstripe,
1547         .coo_layout_get   = lov_object_layout_get,
1548         .coo_maxbytes     = lov_object_maxbytes,
1549         .coo_fiemap       = lov_object_fiemap,
1550 };
1551
1552 static const struct lu_object_operations lov_lu_obj_ops = {
1553         .loo_object_init      = lov_object_init,
1554         .loo_object_delete    = lov_object_delete,
1555         .loo_object_release   = NULL,
1556         .loo_object_free      = lov_object_free,
1557         .loo_object_print     = lov_object_print,
1558         .loo_object_invariant = NULL
1559 };
1560
1561 struct lu_object *lov_object_alloc(const struct lu_env *env,
1562                                    const struct lu_object_header *unused,
1563                                    struct lu_device *dev)
1564 {
1565         struct lov_object *lov;
1566         struct lu_object  *obj;
1567
1568         ENTRY;
1569         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
1570         if (lov != NULL) {
1571                 obj = lov2lu(lov);
1572                 lu_object_init(obj, NULL, dev);
1573                 lov->lo_cl.co_ops = &lov_ops;
1574                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
1575                 /*
1576                  * object io operation vector (cl_object::co_iop) is installed
1577                  * later in lov_object_init(), as different vectors are used
1578                  * for object with different layouts.
1579                  */
1580                 obj->lo_ops = &lov_lu_obj_ops;
1581         } else
1582                 obj = NULL;
1583         RETURN(obj);
1584 }
1585
1586 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
1587 {
1588         struct lov_stripe_md *lsm = NULL;
1589
1590         lov_conf_freeze(lov);
1591         if (lov->lo_lsm != NULL) {
1592                 lsm = lsm_addref(lov->lo_lsm);
1593                 CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
1594                         lsm, atomic_read(&lsm->lsm_refc),
1595                         lov->lo_layout_invalid, current);
1596         }
1597         lov_conf_thaw(lov);
1598         return lsm;
1599 }
1600
1601 int lov_read_and_clear_async_rc(struct cl_object *clob)
1602 {
1603         struct lu_object *luobj;
1604         int rc = 0;
1605         ENTRY;
1606
1607         luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
1608                                  &lov_device_type);
1609         if (luobj != NULL) {
1610                 struct lov_object *lov = lu2lov(luobj);
1611
1612                 lov_conf_freeze(lov);
1613                 switch (lov->lo_type) {
1614                 case LLT_RAID0: {
1615                         struct lov_stripe_md *lsm;
1616                         int i;
1617
1618                         lsm = lov->lo_lsm;
1619                         LASSERT(lsm != NULL);
1620                         for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count;
1621                              i++) {
1622                                 struct lov_oinfo *loi =
1623                                         lsm->lsm_entries[0]->lsme_oinfo[i];
1624
1625                                 if (lov_oinfo_is_dummy(loi))
1626                                         continue;
1627
1628                                 if (loi->loi_ar.ar_rc && !rc)
1629                                         rc = loi->loi_ar.ar_rc;
1630                                 loi->loi_ar.ar_rc = 0;
1631                         }
1632                 }
1633                 case LLT_RELEASED:
1634                 case LLT_EMPTY:
1635                         break;
1636                 default:
1637                         LBUG();
1638                 }
1639                 lov_conf_thaw(lov);
1640         }
1641         RETURN(rc);
1642 }
1643 EXPORT_SYMBOL(lov_read_and_clear_async_rc);
1644
1645 /** @} lov */