Whamcloud - gitweb
remove the lustre book-building bits from Makefile.am, in preparation for
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26
27 #ifdef __KERNEL__
28 #include <asm/div64.h>
29 #else
30 #include <liblustre.h>
31 #endif
32
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
36
37 #include "lov_internal.h"
38
39 static void lov_init_set(struct lov_request_set *set)
40 {
41         set->set_count = 0;
42         set->set_completes = 0;
43         set->set_success = 0;
44         INIT_LIST_HEAD(&set->set_list);
45         atomic_set(&set->set_refcount, 1);
46 }
47
48 static void lov_finish_set(struct lov_request_set *set)
49 {
50         struct list_head *pos, *n;
51         ENTRY;
52
53         LASSERT(set);
54         list_for_each_safe(pos, n, &set->set_list) {
55                 struct lov_request *req = list_entry(pos, struct lov_request,
56                                                      rq_link);
57                 list_del_init(&req->rq_link);
58
59                 if (req->rq_oa)
60                         obdo_free(req->rq_oa);
61                 if (req->rq_md)
62                         OBD_FREE(req->rq_md, req->rq_buflen);
63                 OBD_FREE(req, sizeof(*req));
64         }
65
66         if (set->set_pga) {
67                 int len = set->set_oabufs * sizeof(*set->set_pga);
68                 OBD_FREE(set->set_pga, len);
69         }
70         if (set->set_lockh)
71                 lov_llh_put(set->set_lockh);
72
73         OBD_FREE(set, sizeof(*set));
74         EXIT;
75 }
76
77 static void lov_update_set(struct lov_request_set *set,
78                            struct lov_request *req, int rc)
79 {
80         req->rq_complete = 1;
81         req->rq_rc = rc;
82
83         set->set_completes++;
84         if (rc == 0)
85                 set->set_success++;
86 }
87
88 int lov_update_common_set(struct lov_request_set *set,
89                           struct lov_request *req, int rc)
90 {
91         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
92         ENTRY;
93
94         lov_update_set(set, req, rc);
95
96         /* grace error on inactive ost */
97         if (rc && !lov->tgts[req->rq_idx].active)
98                 rc = 0;
99
100         /* FIXME in raid1 regime, should return 0 */
101         RETURN(rc);
102 }
103
104 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
105 {
106         list_add_tail(&req->rq_link, &set->set_list);
107         set->set_count++;
108 }
109
110 int lov_update_enqueue_set(struct lov_request_set *set,
111                            struct lov_request *req, int rc, int flags)
112 {
113         struct lustre_handle *lov_lockhp;
114         struct lov_oinfo *loi;
115         ENTRY;
116
117         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
118         loi = &set->set_md->lsm_oinfo[req->rq_stripe];
119
120         /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
121          * It belongs in the OSC, except that the OSC doesn't have
122          * access to the real LOI -- it gets a copy, that we created
123          * above, and that copy can be arbitrarily out of date.
124          *
125          * The LOV API is due for a serious rewriting anyways, and this
126          * can be addressed then. */
127         if (rc == ELDLM_OK) {
128                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
129                 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
130
131                 LASSERT(lock != NULL);
132                 loi->loi_rss = tmp;
133                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
134                 /* Extend KMS up to the end of this lock and no further
135                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
136                 if (tmp > lock->l_policy_data.l_extent.end)
137                         tmp = lock->l_policy_data.l_extent.end + 1;
138                 if (tmp >= loi->loi_kms) {
139                         LDLM_DEBUG(lock, "lock acquired, setting rss="
140                                    LPU64", kms="LPU64, loi->loi_rss, tmp);
141                         loi->loi_kms = tmp;
142                         loi->loi_kms_valid = 1;
143                 } else {
144                         LDLM_DEBUG(lock, "lock acquired, setting rss="
145                                    LPU64"; leaving kms="LPU64", end="LPU64,
146                                    loi->loi_rss, loi->loi_kms,
147                                    lock->l_policy_data.l_extent.end);
148                 }
149                 ldlm_lock_allow_match(lock);
150                 LDLM_LOCK_PUT(lock);
151         } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
152                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
153                 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
154                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
155                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
156                        " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
157                 rc = ELDLM_OK;
158         } else {
159                 struct obd_export *exp = set->set_exp;
160                 struct lov_obd *lov = &exp->exp_obd->u.lov;
161
162                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
163                 if (lov->tgts[req->rq_idx].active) {
164                         CERROR("error: enqueue objid "LPX64" subobj "
165                                 LPX64" on OST idx %d: rc = %d\n",
166                                 set->set_md->lsm_object_id, loi->loi_id,
167                                 loi->loi_ost_idx, rc);
168                 } else {
169                         rc = ELDLM_OK;
170                 }
171         }
172         lov_update_set(set, req, rc);
173         RETURN(rc);
174 }
175
176 static int enqueue_done(struct lov_request_set *set, __u32 mode)
177 {
178         struct lov_request *req;
179         struct lustre_handle *lov_lockhp = NULL;
180         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
181         int rc = 0;
182         ENTRY;
183
184         LASSERT(set->set_completes);
185         /* enqueue/match success, just return */
186         if (set->set_completes == set->set_success)
187                 RETURN(0);
188
189         /* cancel enqueued/matched locks */
190         list_for_each_entry(req, &set->set_list, rq_link) {
191                 if (!req->rq_complete || req->rq_rc)
192                         continue;
193
194                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
195                 LASSERT(lov_lockhp);
196                 if (lov_lockhp->cookie == 0)
197                         continue;
198
199                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
200                                 mode, lov_lockhp);
201                 if (rc && lov->tgts[req->rq_idx].active)
202                         CERROR("cancelling obdjid "LPX64" on OST "
203                                "idx %d error: rc = %d\n",
204                                req->rq_md->lsm_object_id, req->rq_idx, rc);
205         }
206         lov_llh_put(set->set_lockh);
207         RETURN(rc);
208 }
209
210 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
211 {
212         int rc = 0;
213         ENTRY;
214
215         if (set == NULL)
216                 RETURN(0);
217         LASSERT(set->set_exp);
218         if (set->set_completes)
219                 rc = enqueue_done(set, mode);
220         else
221                 lov_llh_put(set->set_lockh);
222
223         if (atomic_dec_and_test(&set->set_refcount))
224                 lov_finish_set(set);
225
226         RETURN(rc);
227 }
228
229 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
230                          ldlm_policy_data_t *policy, __u32 mode,
231                          struct lustre_handle *lockh,
232                          struct lov_request_set **reqset)
233 {
234         struct lov_obd *lov = &exp->exp_obd->u.lov;
235         struct lov_request_set *set;
236         int i, rc = 0;
237         struct lov_oinfo *loi;
238         ENTRY;
239
240         OBD_ALLOC(set, sizeof(*set));
241         if (set == NULL)
242                 RETURN(-ENOMEM);
243         lov_init_set(set);
244
245         set->set_exp = exp;
246         set->set_md = lsm;
247         set->set_lockh = lov_llh_new(lsm);
248         if (set->set_lockh == NULL)
249                 GOTO(out_set, rc = -ENOMEM);
250         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
251
252         loi = lsm->lsm_oinfo;
253         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
254                 struct lov_request *req;
255                 obd_off start, end;
256
257                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
258                                            policy->l_extent.end, &start, &end))
259                         continue;
260
261                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
262                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
263                         continue;
264                 }
265
266                 OBD_ALLOC(req, sizeof(*req));
267                 if (req == NULL)
268                         GOTO(out_set, rc = -ENOMEM);
269
270                 req->rq_buflen = sizeof(*req->rq_md) +
271                         sizeof(struct lov_oinfo);
272                 OBD_ALLOC(req->rq_md, req->rq_buflen);
273                 if (req->rq_md == NULL)
274                         GOTO(out_set, rc = -ENOMEM);
275
276                 req->rq_extent.start = start;
277                 req->rq_extent.end = end;
278
279                 req->rq_idx = loi->loi_ost_idx;
280                 req->rq_stripe = i;
281
282                 /* XXX LOV STACKING: submd should be from the subobj */
283                 req->rq_md->lsm_object_id = loi->loi_id;
284                 req->rq_md->lsm_stripe_count = 0;
285                 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
286                 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
287                 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
288                 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
289
290                 lov_set_add_req(req, set);
291         }
292         if (!set->set_count)
293                 GOTO(out_set, rc = -EIO);
294         *reqset = set;
295         RETURN(0);
296 out_set:
297         lov_fini_enqueue_set(set, mode);
298         RETURN(rc);
299 }
300
301 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
302                          int rc)
303 {
304         int ret = rc;
305         ENTRY;
306
307         if (rc == 1)
308                 ret = 0;
309         lov_update_set(set, req, ret);
310         RETURN(rc);
311 }
312
313 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
314 {
315         int rc = 0;
316         ENTRY;
317
318         if (set == NULL)
319                 RETURN(0);
320         LASSERT(set->set_exp);
321         if (set->set_completes) {
322                 if (set->set_count == set->set_success &&
323                     flags & LDLM_FL_TEST_LOCK)
324                         lov_llh_put(set->set_lockh);
325                 rc = enqueue_done(set, mode);
326         } else {
327                 lov_llh_put(set->set_lockh);
328         }
329
330         if (atomic_dec_and_test(&set->set_refcount))
331                 lov_finish_set(set);
332
333         RETURN(rc);
334 }
335
336 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
337                        ldlm_policy_data_t *policy, __u32 mode,
338                        struct lustre_handle *lockh,
339                        struct lov_request_set **reqset)
340 {
341         struct lov_obd *lov = &exp->exp_obd->u.lov;
342         struct lov_request_set *set;
343         int i, rc = 0;
344         struct lov_oinfo *loi;
345         ENTRY;
346
347         OBD_ALLOC(set, sizeof(*set));
348         if (set == NULL)
349                 RETURN(-ENOMEM);
350         lov_init_set(set);
351
352         set->set_exp = exp;
353         set->set_md = lsm;
354         set->set_lockh = lov_llh_new(lsm);
355         if (set->set_lockh == NULL)
356                 GOTO(out_set, rc = -ENOMEM);
357         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
358
359         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
360                 struct lov_request *req;
361                 obd_off start, end;
362
363                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
364                                            policy->l_extent.end, &start, &end))
365                         continue;
366
367                 /* FIXME raid1 should grace this error */
368                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
369                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
370                         GOTO(out_set, rc = -EIO);
371                 }
372
373                 OBD_ALLOC(req, sizeof(*req));
374                 if (req == NULL)
375                         GOTO(out_set, rc = -ENOMEM);
376
377                 req->rq_buflen = sizeof(*req->rq_md);
378                 OBD_ALLOC(req->rq_md, req->rq_buflen);
379                 if (req->rq_md == NULL)
380                         GOTO(out_set, rc = -ENOMEM);
381
382                 req->rq_extent.start = start;
383                 req->rq_extent.end = end;
384
385                 req->rq_idx = loi->loi_ost_idx;
386                 req->rq_stripe = i;
387
388                 /* XXX LOV STACKING: submd should be from the subobj */
389                 req->rq_md->lsm_object_id = loi->loi_id;
390                 req->rq_md->lsm_stripe_count = 0;
391
392                 lov_set_add_req(req, set);
393         }
394         if (!set->set_count)
395                 GOTO(out_set, rc = -EIO);
396         *reqset = set;
397         RETURN(rc);
398 out_set:
399         lov_fini_match_set(set, mode, 0);
400         RETURN(rc);
401 }
402
403 int lov_fini_cancel_set(struct lov_request_set *set)
404 {
405         int rc = 0;
406         ENTRY;
407
408         LASSERT(set->set_exp);
409         if (set == NULL)
410                 RETURN(0);
411
412         if (set->set_lockh)
413                 lov_llh_put(set->set_lockh);
414
415         if (atomic_dec_and_test(&set->set_refcount))
416                 lov_finish_set(set);
417
418         RETURN(rc);
419 }
420
421 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
422                         __u32 mode, struct lustre_handle *lockh,
423                         struct lov_request_set **reqset)
424 {
425         struct lov_request_set *set;
426         int i, rc = 0;
427         struct lov_oinfo *loi;
428         ENTRY;
429
430         OBD_ALLOC(set, sizeof(*set));
431         if (set == NULL)
432                 RETURN(-ENOMEM);
433         lov_init_set(set);
434
435         set->set_exp = exp;
436         set->set_md = lsm;
437         set->set_lockh = lov_handle2llh(lockh);
438         if (set->set_lockh == NULL) {
439                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
440                 GOTO(out_set, rc = -EINVAL);
441         }
442         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
443
444         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
445                 struct lov_request *req;
446                 struct lustre_handle *lov_lockhp;
447
448                 lov_lockhp = set->set_lockh->llh_handles + i;
449                 if (lov_lockhp->cookie == 0) {
450                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
451                                loi->loi_ost_idx, loi->loi_id);
452                         continue;
453                 }
454
455                 OBD_ALLOC(req, sizeof(*req));
456                 if (req == NULL)
457                         GOTO(out_set, rc = -ENOMEM);
458
459                 req->rq_buflen = sizeof(*req->rq_md);
460                 OBD_ALLOC(req->rq_md, req->rq_buflen);
461                 if (req->rq_md == NULL)
462                         GOTO(out_set, rc = -ENOMEM);
463
464                 req->rq_idx = loi->loi_ost_idx;
465                 req->rq_stripe = i;
466
467                 /* XXX LOV STACKING: submd should be from the subobj */
468                 req->rq_md->lsm_object_id = loi->loi_id;
469                 req->rq_md->lsm_stripe_count = 0;
470
471                 lov_set_add_req(req, set);
472         }
473         if (!set->set_count)
474                 GOTO(out_set, rc = -EIO);
475         *reqset = set;
476         RETURN(rc);
477 out_set:
478         lov_fini_cancel_set(set);
479         RETURN(rc);
480 }
481
482 static int create_done(struct obd_export *exp, struct lov_request_set *set,
483                        struct lov_stripe_md **lsmp)
484 {
485         struct lov_obd *lov = &exp->exp_obd->u.lov;
486         struct obd_trans_info *oti = set->set_oti;
487         struct obdo *src_oa = set->set_oa;
488         struct lov_request *req;
489         struct obdo *ret_oa = NULL;
490         int attrset = 0, rc = 0;
491         ENTRY;
492
493         LASSERT(set->set_completes);
494
495         if (!set->set_success)
496                 GOTO(cleanup, rc = -EIO);
497         if (*lsmp == NULL && set->set_count != set->set_success) {
498                 set->set_count = set->set_success;
499                 qos_shrink_lsm(set);
500         }
501
502         ret_oa = obdo_alloc();
503         if (ret_oa == NULL)
504                 GOTO(cleanup, rc = -ENOMEM);
505
506         list_for_each_entry(req, &set->set_list, rq_link) {
507                 if (!req->rq_complete || req->rq_rc)
508                         continue;
509                 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
510                                 set->set_md, req->rq_stripe, &attrset);
511         }
512         if (src_oa->o_valid & OBD_MD_FLSIZE &&
513             ret_oa->o_size != src_oa->o_size) {
514                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
515                        src_oa->o_size, ret_oa->o_size);
516                 LBUG();
517         }
518         ret_oa->o_id = src_oa->o_id;
519         memcpy(src_oa, ret_oa, sizeof(*src_oa));
520         obdo_free(ret_oa);
521
522         *lsmp = set->set_md;
523         GOTO(done, rc = 0);
524
525 cleanup:
526         list_for_each_entry(req, &set->set_list, rq_link) {
527                 struct obd_export *sub_exp;
528                 int err = 0;
529
530                 if (!req->rq_complete || req->rq_rc)
531                         continue;
532
533                 sub_exp = lov->tgts[req->rq_idx].ltd_exp,
534                 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
535                 if (err)
536                         CERROR("Failed to uncreate objid "LPX64" subobj "
537                                LPX64" on OST idx %d: rc = %d\n",
538                                set->set_oa->o_id, req->rq_oa->o_id,
539                                req->rq_idx, rc);
540         }
541         if (*lsmp == NULL)
542                 obd_free_memmd(exp, &set->set_md);
543 done:
544         if (oti && set->set_cookies) {
545                 oti->oti_logcookies = set->set_cookies;
546                 if (!set->set_cookie_sent) {
547                         oti_free_cookies(oti);
548                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
549                 } else {
550                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
551                 }
552         }
553         RETURN(rc);
554 }
555
556 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
557 {
558         int rc = 0;
559         ENTRY;
560
561         LASSERT(set->set_exp);
562         if (set == NULL)
563                 RETURN(0);
564         if (set->set_completes) {
565                 rc = create_done(set->set_exp, set, lsmp);
566                 /* FIXME update qos data here */
567         }
568
569         if (atomic_dec_and_test(&set->set_refcount))
570                 lov_finish_set(set);
571
572         RETURN(rc);
573 }
574
575 int lov_update_create_set(struct lov_request_set *set,
576                           struct lov_request *req, int rc)
577 {
578         struct obd_trans_info *oti = set->set_oti;
579         struct lov_stripe_md *lsm = set->set_md;
580         struct lov_oinfo *loi;
581         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
582         ENTRY;
583
584         req->rq_stripe = set->set_success;
585         loi = &lsm->lsm_oinfo[req->rq_stripe];
586
587         if (rc && lov->tgts[req->rq_idx].active) {
588                 CERROR("error creating objid "LPX64" sub-object"
589                        " on OST idx %d/%d: rc = %d\n",
590                        set->set_oa->o_id, req->rq_idx,
591                        lsm->lsm_stripe_count, rc);
592                 if (rc > 0) {
593                         CERROR("obd_create returned invalid err %d\n", rc);
594                         rc = -EIO;
595                 }
596         }
597         lov_update_set(set, req, rc);
598         if (rc)
599                 RETURN(rc);
600
601         if (oti && oti->oti_objid)
602                 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
603
604         loi->loi_id = req->rq_oa->o_id;
605         loi->loi_ost_idx = req->rq_idx;
606         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
607                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
608         loi_init(loi);
609
610         if (set->set_cookies)
611                 ++oti->oti_logcookies;
612         if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
613                 set->set_cookie_sent++;
614
615         RETURN(0);
616 }
617
618 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **lsmp,
619                         struct obdo *src_oa, struct obd_trans_info *oti,
620                         struct lov_request_set **reqset)
621 {
622         struct lov_obd *lov = &exp->exp_obd->u.lov;
623         struct lov_request_set *set;
624         int rc = 0, newea = 0;
625         ENTRY;
626
627         OBD_ALLOC(set, sizeof(*set));
628         if (set == NULL)
629                 RETURN(-ENOMEM);
630         lov_init_set(set);
631
632         set->set_exp = exp;
633         set->set_md = *lsmp;
634         set->set_oa = src_oa;
635         set->set_oti = oti;
636
637         if (set->set_md == NULL) {
638                 int stripes, stripe_cnt;
639                 stripe_cnt = lov_get_stripecnt(lov, 0);
640
641                 /* If the MDS file was truncated up to some size, stripe over
642                  * enough OSTs to allow the file to be created at that size. */
643                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
644                         stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
645                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
646
647                         if (stripes > lov->desc.ld_active_tgt_count)
648                                 GOTO(out_set, rc = -EFBIG);
649                         if (stripes < stripe_cnt)
650                                 stripes = stripe_cnt;
651                 } else {
652                         stripes = stripe_cnt;
653                 }
654
655                 rc = lov_alloc_memmd(&set->set_md, stripes,
656                                      lov->desc.ld_pattern ?
657                                      lov->desc.ld_pattern : LOV_PATTERN_RAID0);
658                 if (rc < 0)
659                         goto out_set;
660                 newea = 1;
661         }
662
663         rc = qos_prep_create(lov, set, newea);
664         if (rc)
665                 goto out_lsm;
666
667         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
668                 oti_alloc_cookies(oti, set->set_count);
669                 if (!oti->oti_logcookies)
670                         goto out_lsm;
671                 set->set_cookies = oti->oti_logcookies;
672         }
673         *reqset = set;
674         RETURN(rc);
675
676 out_lsm:
677         if (*lsmp == NULL)
678                 obd_free_memmd(exp, &set->set_md);
679 out_set:
680         lov_fini_create_set(set, lsmp);
681         RETURN(rc);
682 }
683
684 static int common_attr_done(struct lov_request_set *set)
685 {
686         struct list_head *pos;
687         struct lov_request *req;
688         struct obdo *tmp_oa;
689         int rc = 0, attrset = 0;
690         ENTRY;
691
692         if (set->set_oa == NULL)
693                 RETURN(0);
694
695         if (!set->set_success)
696                 RETURN(-EIO);
697
698         tmp_oa = obdo_alloc();
699         if (tmp_oa == NULL)
700                 GOTO(out, rc = -ENOMEM);
701
702         list_for_each (pos, &set->set_list) {
703                 req = list_entry(pos, struct lov_request, rq_link);
704
705                 if (!req->rq_complete || req->rq_rc)
706                         continue;
707                 if (req->rq_oa->o_valid == 0)   /* inactive stripe */
708                         continue;
709                 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
710                                 set->set_md, req->rq_stripe, &attrset);
711         }
712         if (!attrset) {
713                 CERROR("No stripes had valid attrs\n");
714                 rc = -EIO;
715         }
716         tmp_oa->o_id = set->set_oa->o_id;
717         memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
718 out:
719         if (tmp_oa)
720                 obdo_free(tmp_oa);
721         RETURN(rc);
722
723 }
724
725 static int brw_done(struct lov_request_set *set)
726 {
727         struct lov_stripe_md *lsm = set->set_md;
728         struct lov_oinfo     *loi = NULL;
729         struct list_head *pos;
730         struct lov_request *req;
731         ENTRY;
732
733         list_for_each (pos, &set->set_list) {
734                 req = list_entry(pos, struct lov_request, rq_link);
735
736                 if (!req->rq_complete || req->rq_rc)
737                         continue;
738
739                 loi = &lsm->lsm_oinfo[req->rq_stripe];
740
741                 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
742                         loi->loi_blocks = req->rq_oa->o_blocks;
743         }
744
745         RETURN(0);
746 }
747
748 int lov_fini_brw_set(struct lov_request_set *set)
749 {
750         int rc = 0;
751         ENTRY;
752
753         LASSERT(set->set_exp);
754         if (set == NULL)
755                 RETURN(0);
756         if (set->set_completes) {
757                 rc = brw_done(set);
758                 /* FIXME update qos data here */
759         }
760         if (atomic_dec_and_test(&set->set_refcount))
761                 lov_finish_set(set);
762
763         RETURN(rc);
764 }
765
766 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
767                      struct lov_stripe_md *lsm, obd_count oa_bufs,
768                      struct brw_page *pga, struct obd_trans_info *oti,
769                      struct lov_request_set **reqset)
770 {
771         struct {
772                 obd_count       index;
773                 obd_count       count;
774                 obd_count       off;
775         } *info = NULL;
776         struct lov_request_set *set;
777         struct lov_oinfo *loi = NULL;
778         struct lov_obd *lov = &exp->exp_obd->u.lov;
779         int rc = 0, i, shift;
780         ENTRY;
781
782         OBD_ALLOC(set, sizeof(*set));
783         if (set == NULL)
784                 RETURN(-ENOMEM);
785         lov_init_set(set);
786
787         set->set_exp = exp;
788         set->set_md = lsm;
789         set->set_oa = src_oa;
790         set->set_oti = oti;
791         set->set_oabufs = oa_bufs;
792         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
793         if (!set->set_pga)
794                 GOTO(out, rc = -ENOMEM);
795
796         OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
797         if (!info)
798                 GOTO(out, rc = -ENOMEM);
799
800         /* calculate the page count for each stripe */
801         for (i = 0; i < oa_bufs; i++) {
802                 int stripe = lov_stripe_number(lsm, pga[i].off);
803                 info[stripe].count++;
804         }
805
806         /* alloc and initialize lov request */
807         shift = 0;
808         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
809                 struct lov_request *req;
810
811                 if (info[i].count == 0)
812                         continue;
813
814                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
815                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
816                         GOTO(out, rc = -EIO);
817                 }
818
819                 OBD_ALLOC(req, sizeof(*req));
820                 if (req == NULL)
821                         GOTO(out, rc = -ENOMEM);
822
823                 req->rq_oa = obdo_alloc();
824                 if (req->rq_oa == NULL)
825                         GOTO(out, rc = -ENOMEM);
826
827                 if (src_oa)
828                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
829                 req->rq_oa->o_id = loi->loi_id;
830
831                 req->rq_buflen = sizeof(*req->rq_md);
832                 OBD_ALLOC(req->rq_md, req->rq_buflen);
833                 if (req->rq_md == NULL)
834                         GOTO(out, rc = -ENOMEM);
835
836                 req->rq_idx = loi->loi_ost_idx;
837                 req->rq_stripe = i;
838
839                 /* XXX LOV STACKING */
840                 req->rq_md->lsm_object_id = loi->loi_id;
841                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
842                 req->rq_oabufs = info[i].count;
843                 req->rq_pgaidx = shift;
844                 shift += req->rq_oabufs;
845
846                 /* remember the index for sort brw_page array */
847                 info[i].index = req->rq_pgaidx;
848
849                 lov_set_add_req(req, set);
850         }
851         if (!set->set_count)
852                 GOTO(out, rc = -EIO);
853
854         /* rotate & sort the brw_page array */
855         for (i = 0; i < oa_bufs; i++) {
856                 int stripe = lov_stripe_number(lsm, pga[i].off);
857
858                 shift = info[stripe].index + info[stripe].off;
859                 LASSERT(shift < oa_bufs);
860                 set->set_pga[shift] = pga[i];
861                 lov_stripe_offset(lsm, pga[i].off, stripe,
862                                   &set->set_pga[shift].off);
863                 info[stripe].off++;
864         }
865 out:
866         if (info)
867                 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
868
869         if (rc == 0)
870                 *reqset = set;
871         else
872                 lov_fini_brw_set(set);
873
874         RETURN(rc);
875 }
876
877 static int getattr_done(struct lov_request_set *set)
878 {
879         return common_attr_done(set);
880 }
881
882 int lov_fini_getattr_set(struct lov_request_set *set)
883 {
884         int rc = 0;
885         ENTRY;
886
887         LASSERT(set->set_exp);
888         if (set == NULL)
889                 RETURN(0);
890         if (set->set_completes)
891                 rc = getattr_done(set);
892
893         if (atomic_dec_and_test(&set->set_refcount))
894                 lov_finish_set(set);
895
896         RETURN(rc);
897 }
898
899 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
900                          struct lov_stripe_md *lsm,
901                          struct lov_request_set **reqset)
902 {
903         struct lov_request_set *set;
904         struct lov_oinfo *loi = NULL;
905         struct lov_obd *lov = &exp->exp_obd->u.lov;
906         int rc = 0, i;
907         ENTRY;
908
909         OBD_ALLOC(set, sizeof(*set));
910         if (set == NULL)
911                 RETURN(-ENOMEM);
912         lov_init_set(set);
913
914         set->set_exp = exp;
915         set->set_md = lsm;
916         set->set_oa = src_oa;
917
918         loi = lsm->lsm_oinfo;
919         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
920                 struct lov_request *req;
921
922                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
923                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
924                         continue;
925                 }
926
927                 OBD_ALLOC(req, sizeof(*req));
928                 if (req == NULL)
929                         GOTO(out_set, rc = -ENOMEM);
930
931                 req->rq_stripe = i;
932                 req->rq_idx = loi->loi_ost_idx;
933
934                 req->rq_oa = obdo_alloc();
935                 if (req->rq_oa == NULL)
936                         GOTO(out_set, rc = -ENOMEM);
937                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
938                 req->rq_oa->o_id = loi->loi_id;
939
940                 lov_set_add_req(req, set);
941         }
942         if (!set->set_count)
943                 GOTO(out_set, rc = -EIO);
944         *reqset = set;
945         RETURN(rc);
946 out_set:
947         lov_fini_getattr_set(set);
948         RETURN(rc);
949 }
950
951 int lov_fini_destroy_set(struct lov_request_set *set)
952 {
953         ENTRY;
954
955         LASSERT(set->set_exp);
956         if (set == NULL)
957                 RETURN(0);
958         if (set->set_completes) {
959                 /* FIXME update qos data here */
960         }
961
962         if (atomic_dec_and_test(&set->set_refcount))
963                 lov_finish_set(set);
964
965         RETURN(0);
966 }
967
968 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
969                          struct lov_stripe_md *lsm,
970                          struct obd_trans_info *oti,
971                          struct lov_request_set **reqset)
972 {
973         struct lov_request_set *set;
974         struct lov_oinfo *loi = NULL;
975         struct lov_obd *lov = &exp->exp_obd->u.lov;
976         int rc = 0, cookie_set = 0, i;
977         ENTRY;
978
979         OBD_ALLOC(set, sizeof(*set));
980         if (set == NULL)
981                 RETURN(-ENOMEM);
982         lov_init_set(set);
983
984         set->set_exp = exp;
985         set->set_md = lsm;
986         set->set_oa = src_oa;
987         set->set_oti = oti;
988         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
989                 set->set_cookies = oti->oti_logcookies;
990
991         loi = lsm->lsm_oinfo;
992         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
993                 struct lov_request *req;
994
995                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
996                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
997                         continue;
998                 }
999
1000                 OBD_ALLOC(req, sizeof(*req));
1001                 if (req == NULL)
1002                         GOTO(out_set, rc = -ENOMEM);
1003
1004                 req->rq_stripe = i;
1005                 req->rq_idx = loi->loi_ost_idx;
1006
1007                 req->rq_oa = obdo_alloc();
1008                 if (req->rq_oa == NULL)
1009                         GOTO(out_set, rc = -ENOMEM);
1010                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1011                 req->rq_oa->o_id = loi->loi_id;
1012
1013                 /* Setup the first request's cookie position */
1014                 if (!cookie_set && set->set_cookies) {
1015                         oti->oti_logcookies = set->set_cookies + i;
1016                         cookie_set = 1;
1017                 }
1018                 lov_set_add_req(req, set);
1019         }
1020         if (!set->set_count)
1021                 GOTO(out_set, rc = -EIO);
1022         *reqset = set;
1023         RETURN(rc);
1024 out_set:
1025         lov_fini_destroy_set(set);
1026         RETURN(rc);
1027 }
1028
1029 static int setattr_done(struct lov_request_set *set)
1030 {
1031         return common_attr_done(set);
1032 }
1033
1034 int lov_fini_setattr_set(struct lov_request_set *set)
1035 {
1036         int rc = 0;
1037         ENTRY;
1038
1039         LASSERT(set->set_exp);
1040         if (set == NULL)
1041                 RETURN(0);
1042         if (set->set_completes) {
1043                 rc = setattr_done(set);
1044                 /* FIXME update qos data here */
1045         }
1046
1047         if (atomic_dec_and_test(&set->set_refcount))
1048                 lov_finish_set(set);
1049         RETURN(rc);
1050 }
1051
1052 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1053                          struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1054                          struct lov_request_set **reqset)
1055 {
1056         struct lov_request_set *set;
1057         struct lov_oinfo *loi = NULL;
1058         struct lov_obd *lov = &exp->exp_obd->u.lov;
1059         int rc = 0, i;
1060         ENTRY;
1061
1062         OBD_ALLOC(set, sizeof(*set));
1063         if (set == NULL)
1064                 RETURN(-ENOMEM);
1065         lov_init_set(set);
1066
1067         set->set_exp = exp;
1068         set->set_md = lsm;
1069         set->set_oa = src_oa;
1070
1071         loi = lsm->lsm_oinfo;
1072         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1073                 struct lov_request *req;
1074
1075                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1076                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1077                         continue;
1078                 }
1079
1080                 OBD_ALLOC(req, sizeof(*req));
1081                 if (req == NULL)
1082                         GOTO(out_set, rc = -ENOMEM);
1083                 req->rq_stripe = i;
1084                 req->rq_idx = loi->loi_ost_idx;
1085
1086                 req->rq_oa = obdo_alloc();
1087                 if (req->rq_oa == NULL)
1088                         GOTO(out_set, rc = -ENOMEM);
1089                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1090                 req->rq_oa->o_id = loi->loi_id;
1091
1092                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1093                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
1094                                               &req->rq_oa->o_size) < 0 &&
1095                             req->rq_oa->o_size)
1096                                 req->rq_oa->o_size--;
1097                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1098                                i, req->rq_oa->o_size, src_oa->o_size);
1099                 }
1100                 lov_set_add_req(req, set);
1101         }
1102         if (!set->set_count)
1103                 GOTO(out_set, rc = -EIO);
1104         *reqset = set;
1105         RETURN(rc);
1106 out_set:
1107         lov_fini_setattr_set(set);
1108         RETURN(rc);
1109 }
1110
1111 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1112                          int rc)
1113 {
1114         struct lov_stripe_md *lsm = set->set_md;
1115         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1116         ENTRY;
1117
1118         lov_update_set(set, req, rc);
1119         if (rc == 0) {
1120                 struct lov_oinfo *loi = &lsm->lsm_oinfo[req->rq_stripe];
1121                 loi->loi_kms = loi->loi_rss = req->rq_extent.start;
1122         }
1123         if (rc && !lov->tgts[req->rq_idx].active)
1124                 rc = 0;
1125         /* FIXME in raid1 regime, should return 0 */
1126         RETURN(rc);
1127 }
1128
1129 int lov_fini_punch_set(struct lov_request_set *set)
1130 {
1131         int rc = 0;
1132         ENTRY;
1133
1134         LASSERT(set->set_exp);
1135         if (set == NULL)
1136                 RETURN(0);
1137         if (set->set_completes) {
1138                 if (!set->set_success)
1139                         rc = -EIO;
1140                 /* FIXME update qos data here */
1141         }
1142
1143         if (atomic_dec_and_test(&set->set_refcount))
1144                 lov_finish_set(set);
1145
1146         RETURN(rc);
1147 }
1148
1149 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1150                        struct lov_stripe_md *lsm, obd_off start,
1151                        obd_off end, struct obd_trans_info *oti,
1152                        struct lov_request_set **reqset)
1153 {
1154         struct lov_request_set *set;
1155         struct lov_oinfo *loi = NULL;
1156         struct lov_obd *lov = &exp->exp_obd->u.lov;
1157         int rc = 0, i;
1158         ENTRY;
1159
1160         OBD_ALLOC(set, sizeof(*set));
1161         if (set == NULL)
1162                 RETURN(-ENOMEM);
1163         lov_init_set(set);
1164
1165         set->set_exp = exp;
1166         set->set_md = lsm;
1167         set->set_oa = src_oa;
1168
1169         loi = lsm->lsm_oinfo;
1170         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1171                 struct lov_request *req;
1172                 obd_off rs, re;
1173
1174                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1175                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1176                         continue;
1177                 }
1178
1179                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1180                         continue;
1181
1182                 OBD_ALLOC(req, sizeof(*req));
1183                 if (req == NULL)
1184                         GOTO(out_set, rc = -ENOMEM);
1185                 req->rq_stripe = i;
1186                 req->rq_idx = loi->loi_ost_idx;
1187
1188                 req->rq_oa = obdo_alloc();
1189                 if (req->rq_oa == NULL)
1190                         GOTO(out_set, rc = -ENOMEM);
1191                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1192                 req->rq_oa->o_id = loi->loi_id;
1193
1194                 req->rq_extent.start = rs;
1195                 req->rq_extent.end = re;
1196
1197                 lov_set_add_req(req, set);
1198         }
1199         if (!set->set_count)
1200                 GOTO(out_set, rc = -EIO);
1201         *reqset = set;
1202         RETURN(rc);
1203 out_set:
1204         lov_fini_punch_set(set);
1205         RETURN(rc);
1206 }
1207
1208 int lov_fini_sync_set(struct lov_request_set *set)
1209 {
1210         int rc = 0;
1211         ENTRY;
1212
1213         LASSERT(set->set_exp);
1214         if (set == NULL)
1215                 RETURN(0);
1216         if (set->set_completes) {
1217                 if (!set->set_success)
1218                         rc = -EIO;
1219                 /* FIXME update qos data here */
1220         }
1221
1222         if (atomic_dec_and_test(&set->set_refcount))
1223                 lov_finish_set(set);
1224
1225         RETURN(rc);
1226 }
1227
1228 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1229                       struct lov_stripe_md *lsm, obd_off start,
1230                       obd_off end, struct lov_request_set **reqset)
1231 {
1232         struct lov_request_set *set;
1233         struct lov_oinfo *loi = NULL;
1234         struct lov_obd *lov = &exp->exp_obd->u.lov;
1235         int rc = 0, i;
1236         ENTRY;
1237
1238         OBD_ALLOC(set, sizeof(*set));
1239         if (set == NULL)
1240                 RETURN(-ENOMEM);
1241         lov_init_set(set);
1242
1243         set->set_exp = exp;
1244         set->set_md = lsm;
1245         set->set_oa = src_oa;
1246
1247         loi = lsm->lsm_oinfo;
1248         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1249                 struct lov_request *req;
1250                 obd_off rs, re;
1251
1252                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1253                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1254                         continue;
1255                 }
1256
1257                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1258                         continue;
1259
1260                 OBD_ALLOC(req, sizeof(*req));
1261                 if (req == NULL)
1262                         GOTO(out_set, rc = -ENOMEM);
1263                 req->rq_stripe = i;
1264                 req->rq_idx = loi->loi_ost_idx;
1265
1266                 req->rq_oa = obdo_alloc();
1267                 if (req->rq_oa == NULL)
1268                         GOTO(out_set, rc = -ENOMEM);
1269                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1270                 req->rq_oa->o_id = loi->loi_id;
1271
1272                 req->rq_extent.start = rs;
1273                 req->rq_extent.end = re;
1274
1275                 lov_set_add_req(req, set);
1276         }
1277         if (!set->set_count)
1278                 GOTO(out_set, rc = -EIO);
1279         *reqset = set;
1280         RETURN(rc);
1281 out_set:
1282         lov_fini_sync_set(set);
1283         RETURN(rc);
1284 }