Whamcloud - gitweb
Branch: HEAD
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26
27 #ifdef __KERNEL__
28 #include <asm/div64.h>
29 #else
30 #include <liblustre.h>
31 #endif
32
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
36
37 #include "lov_internal.h"
38
39 static void lov_init_set(struct lov_request_set *set)
40 {
41         set->set_count = 0;
42         set->set_completes = 0;
43         set->set_success = 0;
44         INIT_LIST_HEAD(&set->set_list);
45         atomic_set(&set->set_refcount, 1);
46 }
47
48 static void lov_finish_set(struct lov_request_set *set)
49 {
50         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
51         struct list_head *pos, *n;
52         ENTRY;
53
54         LASSERT(set);
55         list_for_each_safe(pos, n, &set->set_list) {
56                 struct lov_request *req;
57                 struct lov_tgt_desc *tgt;
58
59                 req = list_entry(pos, struct lov_request, rq_link);
60                 LASSERT(req->rq_idx >= 0);
61
62                 tgt = lov->tgts + req->rq_idx;
63                 lov_tgt_decref(lov, tgt);
64
65                 list_del_init(&req->rq_link);
66
67                 if (req->rq_oa)
68                         obdo_free(req->rq_oa);
69                 if (req->rq_md)
70                         OBD_FREE(req->rq_md, req->rq_buflen);
71                 OBD_FREE(req, sizeof(*req));
72         }
73
74         if (set->set_pga) {
75                 int len = set->set_oabufs * sizeof(*set->set_pga);
76                 OBD_FREE(set->set_pga, len);
77         }
78         if (set->set_lockh)
79                 lov_llh_put(set->set_lockh);
80
81         OBD_FREE(set, sizeof(*set));
82         EXIT;
83 }
84
85 static void lov_update_set(struct lov_request_set *set,
86                            struct lov_request *req, int rc)
87 {
88         req->rq_complete = 1;
89         req->rq_rc = rc;
90
91         set->set_completes++;
92         if (rc == 0)
93                 set->set_success++;
94 }
95
96 int lov_update_common_set(struct lov_request_set *set,
97                           struct lov_request *req, int rc)
98 {
99         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
100         ENTRY;
101
102         lov_update_set(set, req, rc);
103
104         /* grace error on inactive ost */
105         if (rc) {
106                 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
107
108                 if (lov_tgt_active(lov, tgt, req->rq_gen))
109                         lov_tgt_decref(lov, tgt);
110                 else
111                         rc = 0;
112         }
113
114         /* FIXME in raid1 regime, should return 0 */
115         RETURN(rc);
116 }
117
118 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
119 {
120         list_add_tail(&req->rq_link, &set->set_list);
121         set->set_count++;
122 }
123
124 int lov_update_enqueue_set(struct lov_request_set *set,
125                            struct lov_request *req, int rc, int flags)
126 {
127         struct lustre_handle *lov_lockhp;
128         struct lov_oinfo *loi;
129         ENTRY;
130
131         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
132         loi = &set->set_md->lsm_oinfo[req->rq_stripe];
133
134         /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
135          * It belongs in the OSC, except that the OSC doesn't have
136          * access to the real LOI -- it gets a copy, that we created
137          * above, and that copy can be arbitrarily out of date.
138          *
139          * The LOV API is due for a serious rewriting anyways, and this
140          * can be addressed then. */
141         if (rc == ELDLM_OK) {
142                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
143                 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
144
145                 LASSERT(lock != NULL);
146                 loi->loi_rss = tmp;
147                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
148                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
149                 /* Extend KMS up to the end of this lock and no further
150                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
151                 if (tmp > lock->l_policy_data.l_extent.end)
152                         tmp = lock->l_policy_data.l_extent.end + 1;
153                 if (tmp >= loi->loi_kms) {
154                         CDEBUG(D_INODE, "lock acquired, setting rss="
155                                LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
156                         loi->loi_kms = tmp;
157                         loi->loi_kms_valid = 1;
158                 } else {
159                         CDEBUG(D_INODE, "lock acquired, setting rss="
160                                LPU64"; leaving kms="LPU64", end="LPU64
161                                "\n", loi->loi_rss, loi->loi_kms,
162                                lock->l_policy_data.l_extent.end);
163                 }
164                 ldlm_lock_allow_match(lock);
165                 LDLM_LOCK_PUT(lock);
166         } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
167                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
168                 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
169                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
170                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
171                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
172                        " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
173                 rc = ELDLM_OK;
174         } else {
175                 struct obd_export *exp = set->set_exp;
176                 struct lov_obd *lov = &exp->exp_obd->u.lov;
177                 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
178
179                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
180                 if (lov_tgt_ready(lov, tgt, req->rq_gen)) {
181                         lov_tgt_decref(lov, tgt);
182                         CERROR("error: enqueue objid "LPX64" subobj "
183                                 LPX64" on OST idx %d: rc = %d\n",
184                                 set->set_md->lsm_object_id, loi->loi_id,
185                                 loi->loi_ost_idx, rc);
186                 } else {
187                         CERROR("error: enqueue objid "LPX64" subobj "
188                                 LPX64" on OST idx %d: rc = %d, NOT ACTIVE !\n",
189                                 set->set_md->lsm_object_id, loi->loi_id,
190                                 loi->loi_ost_idx, rc);
191                         rc = ELDLM_OK;
192                 }
193         }
194         lov_update_set(set, req, rc);
195         RETURN(rc);
196 }
197
198 static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
199 {
200         struct list_head *pos;
201         struct lov_request *req;
202         struct lustre_handle *lov_lockhp = NULL;
203         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
204         struct lov_tgt_desc *tgt;
205         int rc = 0;
206         ENTRY;
207
208         LASSERT(set->set_completes);
209         /* enqueue/match success, just return */
210         if (set->set_completes == set->set_success) {
211                 if (flags & LDLM_FL_TEST_LOCK)
212                         lov_llh_put(set->set_lockh);
213                 RETURN(0);
214         }
215
216         /* cancel enqueued/matched locks */
217         list_for_each (pos, &set->set_list) {
218                 req = list_entry(pos, struct lov_request, rq_link);
219
220                 if (!req->rq_complete || req->rq_rc)
221                         continue;
222                 if (flags & LDLM_FL_TEST_LOCK)
223                         continue;
224
225                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
226                 LASSERT(lov_lockhp);
227                 if (lov_lockhp->cookie == 0)
228                         continue;
229
230                 tgt = lov->tgts + req->rq_idx;
231                 rc = obd_cancel(tgt->ltd_exp, req->rq_md, mode, lov_lockhp);
232                 if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
233                         lov_tgt_decref(lov, tgt);
234                         CERROR("cancelling obdjid "LPX64" on OST "
235                                "idx %d error: rc = %d\n",
236                                req->rq_md->lsm_object_id, req->rq_idx, rc);
237                 }
238         }
239         lov_llh_put(set->set_lockh);
240         RETURN(rc);
241 }
242
243 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
244 {
245         int rc = 0;
246         ENTRY;
247
248         LASSERT(set->set_exp);
249         if (set == NULL)
250                 RETURN(0);
251         if (set->set_completes)
252                 rc = enqueue_done(set, mode, 0);
253         else
254                 lov_llh_put(set->set_lockh);
255
256         if (atomic_dec_and_test(&set->set_refcount))
257                 lov_finish_set(set);
258
259         RETURN(rc);
260 }
261
262 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
263                          ldlm_policy_data_t *policy, __u32 mode,
264                          struct lustre_handle *lockh,
265                          struct lov_request_set **reqset)
266 {
267         struct lov_obd *lov = &exp->exp_obd->u.lov;
268         struct lov_request_set *set;
269         int i, rc = 0;
270         struct lov_oinfo *loi;
271         ENTRY;
272
273         OBD_ALLOC(set, sizeof(*set));
274         if (set == NULL)
275                 RETURN(-ENOMEM);
276         lov_init_set(set);
277
278         set->set_exp = exp;
279         set->set_md = lsm;
280         set->set_lockh = lov_llh_new(lsm);
281         if (set->set_lockh == NULL)
282                 GOTO(out_set, rc = -ENOMEM);
283         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
284
285         loi = lsm->lsm_oinfo;
286         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
287                 struct lov_tgt_desc *tgt;
288                 struct lov_request *req;
289                 obd_off start, end;
290
291                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
292                                            policy->l_extent.end, &start, &end))
293                         continue;
294
295                 tgt = lov->tgts + loi->loi_ost_idx;
296                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
297                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
298                         continue;
299                 }
300
301                 OBD_ALLOC(req, sizeof(*req));
302                 if (req == NULL) {
303                         lov_tgt_decref(lov, tgt);
304                         GOTO(out_set, rc = -ENOMEM);
305                 }
306
307                 req->rq_buflen = sizeof(*req->rq_md) +
308                         sizeof(struct lov_oinfo);
309                 OBD_ALLOC(req->rq_md, req->rq_buflen);
310                 if (req->rq_md == NULL) {
311                         OBD_FREE(req, sizeof(*req));
312                         lov_tgt_decref(lov, tgt);
313                         GOTO(out_set, rc = -ENOMEM);
314                 }
315
316                 req->rq_extent.start = start;
317                 req->rq_extent.end = end;
318
319                 req->rq_idx = loi->loi_ost_idx;
320                 req->rq_gen = loi->loi_ost_gen;
321                 req->rq_stripe = i;
322
323                 /* XXX LOV STACKING: submd should be from the subobj */
324                 req->rq_md->lsm_object_id = loi->loi_id;
325                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
326                 req->rq_md->lsm_stripe_count = 0;
327                 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
328                 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
329                 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
330                 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
331                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
332
333                 lov_set_add_req(req, set);
334         }
335         if (!set->set_count)
336                 GOTO(out_set, rc = -EIO);
337         *reqset = set;
338         RETURN(0);
339 out_set:
340         lov_fini_enqueue_set(set, mode);
341         RETURN(rc);
342 }
343
344 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
345                          int rc)
346 {
347         ENTRY;
348         lov_update_set(set, req, !rc);
349         RETURN(rc);
350 }
351
352 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
353 {
354         int rc = 0;
355         ENTRY;
356
357         LASSERT(set->set_exp);
358         if (set == NULL)
359                 RETURN(0);
360         if (set->set_completes)
361                 rc = enqueue_done(set, mode, flags);
362         else
363                 lov_llh_put(set->set_lockh);
364
365         if (atomic_dec_and_test(&set->set_refcount))
366                 lov_finish_set(set);
367
368         RETURN(rc);
369 }
370
371 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
372                        ldlm_policy_data_t *policy, __u32 mode,
373                        struct lustre_handle *lockh,
374                        struct lov_request_set **reqset)
375 {
376         struct lov_obd *lov = &exp->exp_obd->u.lov;
377         struct lov_request_set *set;
378         int i, rc = 0;
379         struct lov_oinfo *loi;
380         ENTRY;
381
382         OBD_ALLOC(set, sizeof(*set));
383         if (set == NULL)
384                 RETURN(-ENOMEM);
385         lov_init_set(set);
386
387         set->set_exp = exp;
388         set->set_md = lsm;
389         set->set_lockh = lov_llh_new(lsm);
390         if (set->set_lockh == NULL)
391                 GOTO(out_set, rc = -ENOMEM);
392         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
393
394         loi = lsm->lsm_oinfo;
395         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
396                 struct lov_tgt_desc *tgt;
397                 struct lov_request *req;
398                 obd_off start, end;
399
400                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
401                                            policy->l_extent.end, &start, &end))
402                         continue;
403
404                 /* FIXME raid1 should grace this error */
405                 tgt = lov->tgts + loi->loi_ost_idx;
406                 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
407                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
408                         GOTO(out_set, rc = -EIO);
409                 }
410
411                 OBD_ALLOC(req, sizeof(*req));
412                 if (req == NULL) {
413                         lov_tgt_decref(lov, tgt);
414                         GOTO(out_set, rc = -ENOMEM);
415                 }
416
417                 req->rq_buflen = sizeof(*req->rq_md);
418                 OBD_ALLOC(req->rq_md, req->rq_buflen);
419                 if (req->rq_md == NULL) {
420                         OBD_FREE(req, sizeof(*req));
421                         lov_tgt_decref(lov, tgt);
422                         GOTO(out_set, rc = -ENOMEM);
423                 }
424
425                 req->rq_extent.start = start;
426                 req->rq_extent.end = end;
427
428                 req->rq_idx = loi->loi_ost_idx;
429                 req->rq_gen = loi->loi_ost_gen;
430                 req->rq_stripe = i;
431
432                 /* XXX LOV STACKING: submd should be from the subobj */
433                 req->rq_md->lsm_object_id = loi->loi_id;
434                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
435                 req->rq_md->lsm_stripe_count = 0;
436                 lov_set_add_req(req, set);
437         }
438         if (!set->set_count)
439                 GOTO(out_set, rc = -EIO);
440         *reqset = set;
441         RETURN(rc);
442 out_set:
443         lov_fini_match_set(set, mode, 0);
444         RETURN(rc);
445 }
446
447 int lov_fini_cancel_set(struct lov_request_set *set)
448 {
449         int rc = 0;
450         ENTRY;
451
452         LASSERT(set->set_exp);
453         if (set == NULL)
454                 RETURN(0);
455
456         if (set->set_lockh)
457                 lov_llh_put(set->set_lockh);
458
459         if (atomic_dec_and_test(&set->set_refcount))
460                 lov_finish_set(set);
461
462         RETURN(rc);
463 }
464
465 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
466                         __u32 mode, struct lustre_handle *lockh,
467                         struct lov_request_set **reqset)
468 {
469         struct lov_obd *lov = &exp->exp_obd->u.lov;
470         struct lov_request_set *set;
471         int i, rc = 0;
472         struct lov_oinfo *loi;
473         ENTRY;
474
475         OBD_ALLOC(set, sizeof(*set));
476         if (set == NULL)
477                 RETURN(-ENOMEM);
478         lov_init_set(set);
479
480         set->set_exp = exp;
481         set->set_md = lsm;
482         set->set_lockh = lov_handle2llh(lockh);
483         if (set->set_lockh == NULL) {
484                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
485                 GOTO(out_set, rc = -EINVAL);
486         }
487         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
488
489         loi = lsm->lsm_oinfo;
490         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
491                 struct lov_tgt_desc *tgt;
492                 struct lov_request *req;
493                 struct lustre_handle *lov_lockhp;
494
495                 lov_lockhp = set->set_lockh->llh_handles + i;
496                 if (lov_lockhp->cookie == 0) {
497                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
498                                loi->loi_ost_idx, loi->loi_id);
499                         continue;
500                 }
501
502                 tgt = lov->tgts + loi->loi_ost_idx;
503                 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
504                         CERROR("lov idx %d subobj "LPX64" osc inactive?\n",
505                                loi->loi_ost_idx, loi->loi_id);
506                         continue;
507                 }
508
509                 OBD_ALLOC(req, sizeof(*req));
510                 if (req == NULL) {
511                         lov_tgt_decref(lov, tgt);
512                         GOTO(out_set, rc = -ENOMEM);
513                 }
514
515                 req->rq_buflen = sizeof(*req->rq_md);
516                 OBD_ALLOC(req->rq_md, req->rq_buflen);
517                 if (req->rq_md == NULL) {
518                         OBD_FREE(req, sizeof(*req));
519                         lov_tgt_decref(lov, tgt);
520                         GOTO(out_set, rc = -ENOMEM);
521                 }
522
523                 req->rq_idx = loi->loi_ost_idx;
524                 req->rq_gen = loi->loi_ost_gen;
525                 req->rq_stripe = i;
526
527                 /* XXX LOV STACKING: submd should be from the subobj */
528                 req->rq_md->lsm_object_id = loi->loi_id;
529                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
530                 req->rq_md->lsm_stripe_count = 0;
531                 lov_set_add_req(req, set);
532         }
533         if (!set->set_count)
534                 GOTO(out_set, rc = -EIO);
535         *reqset = set;
536         RETURN(rc);
537 out_set:
538         lov_fini_cancel_set(set);
539         RETURN(rc);
540 }
541
542 static int create_done(struct obd_export *exp, struct lov_request_set *set,
543                        struct lov_stripe_md **ea)
544 {
545         struct lov_obd *lov = &exp->exp_obd->u.lov;
546         struct obd_trans_info *oti = set->set_oti;
547         struct obdo *src_oa = set->set_oa;
548         struct list_head *pos;
549         struct lov_request *req;
550         struct obdo *ret_oa = NULL;
551         int attrset = 0, rc = 0;
552         ENTRY;
553
554         LASSERT(set->set_completes);
555
556         if (!set->set_success)
557                 GOTO(cleanup, rc = -EIO);
558
559         if (*ea == NULL && set->set_count != set->set_success) {
560                 set->set_count = set->set_success;
561                 qos_shrink_lsm(set);
562         }
563
564         ret_oa = obdo_alloc();
565         if (ret_oa == NULL)
566                 GOTO(cleanup, rc = -ENOMEM);
567
568         list_for_each (pos, &set->set_list) {
569                 req = list_entry(pos, struct lov_request, rq_link);
570                 if (!req->rq_complete || req->rq_rc)
571                         continue;
572                 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
573                                 set->set_md, req->rq_stripe, &attrset);
574         }
575         if (src_oa->o_valid & OBD_MD_FLSIZE &&
576             ret_oa->o_size != src_oa->o_size) {
577                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
578                        src_oa->o_size, ret_oa->o_size);
579                 LBUG();
580         }
581         ret_oa->o_id = src_oa->o_id;
582         ret_oa->o_gr = src_oa->o_gr;
583         ret_oa->o_valid |= OBD_MD_FLGROUP;
584         memcpy(src_oa, ret_oa, sizeof(*src_oa));
585         obdo_free(ret_oa);
586
587         *ea = set->set_md;
588         GOTO(done, rc = 0);
589
590         EXIT;
591 cleanup:
592         list_for_each (pos, &set->set_list) {
593                 struct lov_tgt_desc *tgt;
594                 int err = 0;
595                 req = list_entry(pos, struct lov_request, rq_link);
596
597                 if (!req->rq_complete || req->rq_rc)
598                         continue;
599
600                 tgt = lov->tgts + req->rq_idx;
601                 if (!lov_tgt_ready(lov, tgt, req->rq_gen)) {
602                         CERROR("Failed to uncreate objid "LPX64" subobj "
603                                LPX64" on OST idx %d: osc inactive.\n",
604                                set->set_oa->o_id, req->rq_oa->o_id,
605                                req->rq_idx);
606                         continue;
607                 }
608
609                 err = obd_destroy(tgt->ltd_exp, req->rq_oa, NULL, oti);
610                 lov_tgt_decref(lov, tgt);
611                 if (err)
612                         CERROR("Failed to uncreate objid "LPX64" subobj "
613                                LPX64" on OST idx %d: rc = %d\n",
614                                set->set_oa->o_id, req->rq_oa->o_id,
615                                req->rq_idx, rc);
616         }
617         if (*ea == NULL)
618                 obd_free_memmd(exp, &set->set_md);
619 done:
620         if (oti && set->set_cookies) {
621                 oti->oti_logcookies = set->set_cookies;
622                 if (!set->set_cookie_sent) {
623                         oti_free_cookies(oti);
624                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
625                 } else {
626                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
627                 }
628         }
629         return rc;
630 }
631
632 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
633 {
634         int rc = 0;
635         ENTRY;
636
637         LASSERT(set->set_exp);
638         if (set == NULL)
639                 RETURN(0);
640         if (set->set_completes) {
641                 rc = create_done(set->set_exp, set, ea);
642                 /* FIXME update qos data here */
643         }
644
645         if (atomic_dec_and_test(&set->set_refcount))
646                 lov_finish_set(set);
647
648         RETURN(rc);
649 }
650
651 int lov_update_create_set(struct lov_request_set *set,
652                           struct lov_request *req, int rc)
653 {
654         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
655         struct obd_trans_info *oti = set->set_oti;
656         struct lov_stripe_md *lsm = set->set_md;
657         struct lov_oinfo *loi;
658         struct lov_tgt_desc *tgt;
659         ENTRY;
660
661         req->rq_stripe = set->set_success;
662         loi = &lsm->lsm_oinfo[req->rq_stripe];
663         tgt = lov->tgts + req->rq_idx;
664
665         if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
666                 lov_tgt_decref(lov, tgt);
667                 CERROR("error creating objid "LPX64" sub-object"
668                        " on OST idx %d/%d: rc = %d\n",
669                        set->set_oa->o_id, req->rq_idx,
670                        lsm->lsm_stripe_count, rc);
671                 if (rc > 0) {
672                         CERROR("obd_create returned invalid err %d\n", rc);
673                         rc = -EIO;
674                 }
675         }
676         lov_update_set(set, req, rc);
677         if (rc)
678                 RETURN(rc);
679
680         if (oti && oti->oti_objid)
681                 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
682
683         loi->loi_id = req->rq_oa->o_id;
684         loi->loi_gr = req->rq_oa->o_gr;
685         loi->loi_ost_idx = req->rq_idx;
686         loi->loi_ost_gen = req->rq_gen;
687         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at "
688                "idx %d gen %d\n",
689                lsm->lsm_object_id, loi->loi_id, loi->loi_id,
690                req->rq_idx, req->rq_gen);
691         loi_init(loi);
692
693         if (set->set_cookies)
694                 ++oti->oti_logcookies;
695         if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
696                 set->set_cookie_sent++;
697
698         RETURN(0);
699 }
700
701 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
702                         struct obdo *src_oa, struct obd_trans_info *oti,
703                         struct lov_request_set **reqset)
704 {
705         struct lov_obd *lov = &exp->exp_obd->u.lov;
706         struct lov_request_set *set;
707         int rc = 0, newea = 0;
708         ENTRY;
709
710         OBD_ALLOC(set, sizeof(*set));
711         if (set == NULL)
712                 RETURN(-ENOMEM);
713         lov_init_set(set);
714
715         set->set_exp = exp;
716         set->set_md = *ea;
717         set->set_oa = src_oa;
718         set->set_oti = oti;
719
720         if (set->set_md == NULL) {
721                 int stripes, stripe_cnt;
722                 stripe_cnt = lov_get_stripecnt(lov, 0);
723
724                 /* If the MDS file was truncated up to some size, stripe over
725                  * enough OSTs to allow the file to be created at that size. */
726                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
727                         stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1;
728                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
729
730                         if (stripes > lov->desc.ld_active_tgt_count)
731                                 GOTO(out_set, rc = -EFBIG);
732                         if (stripes < stripe_cnt)
733                                 stripes = stripe_cnt;
734                 } else {
735                         stripes = stripe_cnt;
736                 }
737
738                 rc = lov_alloc_memmd(&set->set_md, stripes,
739                                      lov->desc.ld_pattern ?
740                                      lov->desc.ld_pattern : LOV_PATTERN_RAID0);
741                 if (rc < 0)
742                         goto out_set;
743                 newea = 1;
744         }
745
746         rc = qos_prep_create(lov, set, newea);
747         if (rc)
748                 goto out_lsm;
749
750         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
751                 oti_alloc_cookies(oti, set->set_count);
752                 if (!oti->oti_logcookies)
753                         goto out_lsm;
754                 set->set_cookies = oti->oti_logcookies;
755         }
756         *reqset = set;
757         RETURN(rc);
758
759 out_lsm:
760         if (*ea == NULL)
761                 obd_free_memmd(exp, &set->set_md);
762 out_set:
763         lov_fini_create_set(set, ea);
764         RETURN(rc);
765 }
766
767 static int common_attr_done(struct lov_request_set *set)
768 {
769         struct list_head *pos;
770         struct lov_request *req;
771         struct obdo *tmp_oa;
772         int rc = 0, attrset = 0;
773         ENTRY;
774
775         if (set->set_oa == NULL)
776                 RETURN(0);
777
778         if (!set->set_success)
779                 RETURN(-EIO);
780
781         tmp_oa = obdo_alloc();
782         if (tmp_oa == NULL)
783                 GOTO(out, rc = -ENOMEM);
784
785         list_for_each (pos, &set->set_list) {
786                 req = list_entry(pos, struct lov_request, rq_link);
787
788                 if (!req->rq_complete || req->rq_rc)
789                         continue;
790                 if (req->rq_oa->o_valid == 0)   /* inactive stripe */
791                         continue;
792                 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
793                                 set->set_md, req->rq_stripe, &attrset);
794         }
795         if (!attrset) {
796                 CERROR("No stripes had valid attrs\n");
797                 rc = -EIO;
798         }
799         tmp_oa->o_id = set->set_oa->o_id;
800         memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
801 out:
802         if (tmp_oa)
803                 obdo_free(tmp_oa);
804         RETURN(rc);
805
806 }
807
808 static int brw_done(struct lov_request_set *set)
809 {
810         struct lov_stripe_md *lsm = set->set_md;
811         struct lov_oinfo     *loi = NULL;
812         struct list_head *pos;
813         struct lov_request *req;
814         ENTRY;
815                                                                                                                              
816         list_for_each (pos, &set->set_list) {
817                 req = list_entry(pos, struct lov_request, rq_link);
818                                                                                                                              
819                 if (!req->rq_complete || req->rq_rc)
820                         continue;
821                                                                                                                              
822                 loi = &lsm->lsm_oinfo[req->rq_stripe];
823                                                                                                                              
824                 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
825                         loi->loi_blocks = req->rq_oa->o_blocks;
826         }
827                                                                                                                              
828         RETURN(0);
829 }
830
831 int lov_fini_brw_set(struct lov_request_set *set)
832 {
833         int rc = 0;
834         ENTRY;
835
836         LASSERT(set->set_exp);
837         if (set == NULL)
838                 RETURN(0);
839         if (set->set_completes) {
840                 rc = brw_done(set);
841                 /* FIXME update qos data here */
842         }
843         if (atomic_dec_and_test(&set->set_refcount))
844                 lov_finish_set(set);
845
846         RETURN(rc);
847 }
848
849 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
850                      struct lov_stripe_md *lsm, obd_count oa_bufs,
851                      struct brw_page *pga, struct obd_trans_info *oti,
852                      struct lov_request_set **reqset)
853 {
854         struct {
855                 obd_count       index;
856                 obd_count       count;
857                 obd_count       off;
858         } *info = NULL;
859         struct lov_request_set *set;
860         struct lov_oinfo *loi = NULL;
861         struct lov_obd *lov = &exp->exp_obd->u.lov;
862         int rc = 0, i, shift;
863         ENTRY;
864
865         OBD_ALLOC(set, sizeof(*set));
866         if (set == NULL)
867                 RETURN(-ENOMEM);
868         lov_init_set(set);
869
870         set->set_exp = exp;
871         set->set_md = lsm;
872         set->set_oa = src_oa;
873         set->set_oti = oti;
874         set->set_oabufs = oa_bufs;
875         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
876         if (!set->set_pga)
877                 GOTO(out, rc = -ENOMEM);
878
879         OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
880         if (!info)
881                 GOTO(out, rc = -ENOMEM);
882
883         /* calculate the page count for each stripe */
884         for (i = 0; i < oa_bufs; i++) {
885                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
886                 info[stripe].count++;
887         }
888
889         /* alloc and initialize lov request */
890         loi = lsm->lsm_oinfo;
891         shift = 0;
892         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
893                 struct lov_request *req;
894                 struct lov_tgt_desc *tgt;
895
896                 if (info[i].count == 0)
897                         continue;
898
899                 tgt = lov->tgts + loi->loi_ost_idx;
900                 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
901                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
902                         GOTO(out, rc = -EIO);
903                 }
904
905                 OBD_ALLOC(req, sizeof(*req));
906                 if (req == NULL) {
907                         lov_tgt_decref(lov, tgt);
908                         GOTO(out, rc = -ENOMEM);
909                 }
910
911                 req->rq_oa = obdo_alloc();
912                 if (req->rq_oa == NULL) {
913                         OBD_FREE(req, sizeof(*req));
914                         lov_tgt_decref(lov, tgt);
915                         GOTO(out, rc = -ENOMEM);
916                 }
917
918                 if (src_oa)
919                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
920                 req->rq_oa->o_id = loi->loi_id;
921                 req->rq_buflen = sizeof(*req->rq_md);
922                 OBD_ALLOC(req->rq_md, req->rq_buflen);
923                 if (req->rq_md == NULL) {
924                         obdo_free(req->rq_oa);
925                         OBD_FREE(req, sizeof(*req));
926                         lov_tgt_decref(lov, tgt);
927                         GOTO(out, rc = -ENOMEM);
928                 }
929
930                 req->rq_idx = loi->loi_ost_idx;
931                 req->rq_gen = loi->loi_ost_gen;
932                 req->rq_stripe = i;
933
934                 /* XXX LOV STACKING */
935                 req->rq_md->lsm_object_id = loi->loi_id;
936                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
937                 req->rq_oabufs = info[i].count;
938                 req->rq_pgaidx = shift;
939                 shift += req->rq_oabufs;
940
941                 /* remember the index for sort brw_page array */
942                 info[i].index = req->rq_pgaidx;
943                 lov_set_add_req(req, set);
944         }
945         if (!set->set_count)
946                 GOTO(out, rc = -EIO);
947
948         /* rotate & sort the brw_page array */
949         for (i = 0; i < oa_bufs; i++) {
950                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
951
952                 shift = info[stripe].index + info[stripe].off;
953                 LASSERT(shift < oa_bufs);
954                 set->set_pga[shift] = pga[i];
955                 lov_stripe_offset(lsm, pga[i].disk_offset, stripe,
956                                   &set->set_pga[shift].disk_offset);
957                 info[stripe].off++;
958         }
959 out:
960         if (info)
961                 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
962
963         if (rc == 0)
964                 *reqset = set;
965         else
966                 lov_fini_brw_set(set);
967
968         RETURN(rc);
969 }
970
971 static int getattr_done(struct lov_request_set *set)
972 {
973         return common_attr_done(set);
974 }
975
976 int lov_fini_getattr_set(struct lov_request_set *set)
977 {
978         int rc = 0;
979         ENTRY;
980
981         LASSERT(set->set_exp);
982         if (set == NULL)
983                 RETURN(0);
984         if (set->set_completes)
985                 rc = getattr_done(set);
986
987         if (atomic_dec_and_test(&set->set_refcount))
988                 lov_finish_set(set);
989
990         RETURN(rc);
991 }
992
993 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
994                          struct lov_stripe_md *lsm,
995                          struct lov_request_set **reqset)
996 {
997         struct lov_request_set *set;
998         struct lov_oinfo *loi = NULL;
999         struct lov_obd *lov = &exp->exp_obd->u.lov;
1000         int rc = 0, i;
1001         ENTRY;
1002
1003         OBD_ALLOC(set, sizeof(*set));
1004         if (set == NULL)
1005                 RETURN(-ENOMEM);
1006         lov_init_set(set);
1007
1008         set->set_exp = exp;
1009         set->set_md = lsm;
1010         set->set_oa = src_oa;
1011
1012         loi = lsm->lsm_oinfo;
1013         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1014                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1015                 struct lov_request *req;
1016
1017                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1018                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1019                         continue;
1020                 }
1021
1022                 OBD_ALLOC(req, sizeof(*req));
1023                 if (req == NULL) {
1024                         lov_tgt_decref(lov, tgt);
1025                         GOTO(out_set, rc = -ENOMEM);
1026                 }
1027
1028                 req->rq_stripe = i;
1029                 req->rq_idx = loi->loi_ost_idx;
1030                 req->rq_gen = loi->loi_ost_gen;
1031
1032                 req->rq_oa = obdo_alloc();
1033                 if (req->rq_oa == NULL) {
1034                         OBD_FREE(req, sizeof(*req));
1035                         lov_tgt_decref(lov, tgt);
1036                         GOTO(out_set, rc = -ENOMEM);
1037                 }
1038                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1039                 req->rq_oa->o_id = loi->loi_id;
1040                 lov_set_add_req(req, set);
1041         }
1042         if (!set->set_count)
1043                 GOTO(out_set, rc = -EIO);
1044         *reqset = set;
1045         RETURN(rc);
1046 out_set:
1047         lov_fini_getattr_set(set);
1048         RETURN(rc);
1049 }
1050
1051 int lov_fini_destroy_set(struct lov_request_set *set)
1052 {
1053         ENTRY;
1054
1055         LASSERT(set->set_exp);
1056         if (set == NULL)
1057                 RETURN(0);
1058         if (set->set_completes) {
1059                 /* FIXME update qos data here */
1060         }
1061
1062         if (atomic_dec_and_test(&set->set_refcount))
1063                 lov_finish_set(set);
1064
1065         RETURN(0);
1066 }
1067
1068 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
1069                          struct lov_stripe_md *lsm,
1070                          struct obd_trans_info *oti,
1071                          struct lov_request_set **reqset)
1072 {
1073         struct lov_request_set *set;
1074         struct lov_oinfo *loi = NULL;
1075         struct lov_obd *lov = &exp->exp_obd->u.lov;
1076         int rc = 0, cookie_set = 0, i;
1077         ENTRY;
1078
1079         OBD_ALLOC(set, sizeof(*set));
1080         if (set == NULL)
1081                 RETURN(-ENOMEM);
1082         lov_init_set(set);
1083
1084         set->set_exp = exp;
1085         set->set_md = lsm;
1086         set->set_oa = src_oa;
1087         set->set_oti = oti;
1088         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1089                 set->set_cookies = oti->oti_logcookies;
1090
1091         loi = lsm->lsm_oinfo;
1092         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1093                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1094                 struct lov_request *req;
1095
1096                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1097                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1098                         continue;
1099                 }
1100
1101                 OBD_ALLOC(req, sizeof(*req));
1102                 if (req == NULL) {
1103                         lov_tgt_decref(lov, tgt);
1104                         GOTO(out_set, rc = -ENOMEM);
1105                 }
1106
1107                 req->rq_stripe = i;
1108                 req->rq_idx = loi->loi_ost_idx;
1109                 req->rq_gen = loi->loi_ost_gen;
1110
1111                 req->rq_oa = obdo_alloc();
1112                 if (req->rq_oa == NULL) {
1113                         OBD_FREE(req, sizeof(*req));
1114                         lov_tgt_decref(lov, tgt);
1115                         GOTO(out_set, rc = -ENOMEM);
1116                 }
1117
1118                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1119                 req->rq_oa->o_id = loi->loi_id;
1120
1121                 /* Setup the first request's cookie position */
1122                 if (!cookie_set && set->set_cookies) {
1123                         oti->oti_logcookies = set->set_cookies + i;
1124                         cookie_set = 1;
1125                 }
1126                 lov_set_add_req(req, set);
1127         }
1128         if (!set->set_count)
1129                 GOTO(out_set, rc = -EIO);
1130         *reqset = set;
1131         RETURN(rc);
1132 out_set:
1133         lov_fini_destroy_set(set);
1134         RETURN(rc);
1135 }
1136
1137 static int setattr_done(struct lov_request_set *set)
1138 {
1139         return common_attr_done(set);
1140 }
1141
1142 int lov_fini_setattr_set(struct lov_request_set *set)
1143 {
1144         int rc = 0;
1145         ENTRY;
1146
1147         LASSERT(set->set_exp);
1148         if (set == NULL)
1149                 RETURN(0);
1150         if (set->set_completes) {
1151                 rc = setattr_done(set);
1152                 /* FIXME update qos data here */
1153         }
1154
1155         if (atomic_dec_and_test(&set->set_refcount))
1156                 lov_finish_set(set);
1157         RETURN(rc);
1158 }
1159
1160 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1161                          struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1162                          struct lov_request_set **reqset)
1163 {
1164         struct lov_request_set *set;
1165         struct lov_oinfo *loi = NULL;
1166         struct lov_obd *lov = &exp->exp_obd->u.lov;
1167         int rc = 0, i;
1168         ENTRY;
1169
1170         OBD_ALLOC(set, sizeof(*set));
1171         if (set == NULL)
1172                 RETURN(-ENOMEM);
1173         lov_init_set(set);
1174
1175         set->set_exp = exp;
1176         set->set_md = lsm;
1177         set->set_oa = src_oa;
1178
1179         loi = lsm->lsm_oinfo;
1180         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1181                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1182                 struct lov_request *req;
1183
1184                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1185                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1186                         continue;
1187                 }
1188
1189                 OBD_ALLOC(req, sizeof(*req));
1190                 if (req == NULL) {
1191                         lov_tgt_decref(lov, tgt);
1192                         GOTO(out_set, rc = -ENOMEM);
1193                 }
1194
1195                 req->rq_stripe = i;
1196                 req->rq_idx = loi->loi_ost_idx;
1197                 req->rq_gen = loi->loi_ost_gen;
1198
1199                 req->rq_oa = obdo_alloc();
1200                 if (req->rq_oa == NULL) {
1201                         OBD_FREE(req, sizeof(*req));
1202                         lov_tgt_decref(lov, tgt);
1203                         GOTO(out_set, rc = -ENOMEM);
1204                 }
1205
1206                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1207                 req->rq_oa->o_id = loi->loi_id;
1208                 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
1209
1210                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1211                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
1212                                               &req->rq_oa->o_size) < 0 &&
1213                             req->rq_oa->o_size)
1214                                 req->rq_oa->o_size--;
1215                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1216                                i, req->rq_oa->o_size, src_oa->o_size);
1217                 }
1218                 lov_set_add_req(req, set);
1219         }
1220         if (!set->set_count)
1221                 GOTO(out_set, rc = -EIO);
1222         *reqset = set;
1223         RETURN(rc);
1224 out_set:
1225         lov_fini_setattr_set(set);
1226         RETURN(rc);
1227 }
1228
1229 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1230                          int rc)
1231 {
1232         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1233         ENTRY;
1234
1235         lov_update_set(set, req, rc);
1236         if (rc) {
1237                 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
1238
1239                 if (lov_tgt_active(lov, tgt, req->rq_gen))
1240                         lov_tgt_decref(lov, tgt);
1241                 else
1242                         rc = 0;
1243         }
1244
1245         /* FIXME in raid1 regime, should return 0 */
1246         RETURN(rc);
1247 }
1248
1249 int lov_fini_punch_set(struct lov_request_set *set)
1250 {
1251         int rc = 0;
1252         ENTRY;
1253
1254         LASSERT(set->set_exp);
1255         if (set == NULL)
1256                 RETURN(0);
1257         if (set->set_completes) {
1258                 if (!set->set_success)
1259                         rc = -EIO;
1260                 /* FIXME update qos data here */
1261         }
1262
1263         if (atomic_dec_and_test(&set->set_refcount))
1264                 lov_finish_set(set);
1265
1266         RETURN(rc);
1267 }
1268
1269 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1270                        struct lov_stripe_md *lsm, obd_off start,
1271                        obd_off end, struct obd_trans_info *oti,
1272                        struct lov_request_set **reqset)
1273 {
1274         struct lov_request_set *set;
1275         struct lov_oinfo *loi = NULL;
1276         struct lov_obd *lov = &exp->exp_obd->u.lov;
1277         int rc = 0, i;
1278         ENTRY;
1279
1280         OBD_ALLOC(set, sizeof(*set));
1281         if (set == NULL)
1282                 RETURN(-ENOMEM);
1283         lov_init_set(set);
1284
1285         set->set_exp = exp;
1286         set->set_md = lsm;
1287         set->set_oa = src_oa;
1288
1289         loi = lsm->lsm_oinfo;
1290         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1291                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1292                 struct lov_request *req;
1293                 obd_off rs, re;
1294
1295                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1296                         continue;
1297
1298                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1299                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1300                         continue;
1301                 }
1302
1303                 OBD_ALLOC(req, sizeof(*req));
1304                 if (req == NULL) {
1305                         lov_tgt_decref(lov, tgt);
1306                         GOTO(out_set, rc = -ENOMEM);
1307                 }
1308
1309                 req->rq_stripe = i;
1310                 req->rq_idx = loi->loi_ost_idx;
1311                 req->rq_gen = loi->loi_ost_gen;
1312
1313                 req->rq_oa = obdo_alloc();
1314                 if (req->rq_oa == NULL) {
1315                         OBD_FREE(req, sizeof(*req));
1316                         lov_tgt_decref(lov, tgt);
1317                         GOTO(out_set, rc = -ENOMEM);
1318                 }
1319
1320                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1321                 req->rq_oa->o_id = loi->loi_id;
1322                 req->rq_oa->o_gr = loi->loi_gr;
1323                 req->rq_oa->o_valid |= OBD_MD_FLGROUP;
1324
1325                 req->rq_extent.start = rs;
1326                 req->rq_extent.end = re;
1327
1328                 lov_set_add_req(req, set);
1329         }
1330         if (!set->set_count)
1331                 GOTO(out_set, rc = -EIO);
1332         *reqset = set;
1333         RETURN(rc);
1334 out_set:
1335         lov_fini_punch_set(set);
1336         RETURN(rc);
1337 }
1338
1339 int lov_fini_sync_set(struct lov_request_set *set)
1340 {
1341         int rc = 0;
1342         ENTRY;
1343
1344         LASSERT(set->set_exp);
1345         if (set == NULL)
1346                 RETURN(0);
1347         if (set->set_completes) {
1348                 if (!set->set_success)
1349                         rc = -EIO;
1350                 /* FIXME update qos data here */
1351         }
1352
1353         if (atomic_dec_and_test(&set->set_refcount))
1354                 lov_finish_set(set);
1355
1356         RETURN(rc);
1357 }
1358
1359 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1360                       struct lov_stripe_md *lsm, obd_off start,
1361                       obd_off end, struct lov_request_set **reqset)
1362 {
1363         struct lov_request_set *set;
1364         struct lov_oinfo *loi = NULL;
1365         struct lov_obd *lov = &exp->exp_obd->u.lov;
1366         int rc = 0, i;
1367         ENTRY;
1368
1369         OBD_ALLOC(set, sizeof(*set));
1370         if (set == NULL)
1371                 RETURN(-ENOMEM);
1372         lov_init_set(set);
1373
1374         set->set_exp = exp;
1375         set->set_md = lsm;
1376         set->set_oa = src_oa;
1377
1378         loi = lsm->lsm_oinfo;
1379         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1380                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1381                 struct lov_request *req;
1382                 obd_off rs, re;
1383
1384                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1385                         continue;
1386
1387                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1388                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1389                         continue;
1390                 }
1391
1392                 OBD_ALLOC(req, sizeof(*req));
1393                 if (req == NULL) {
1394                         lov_tgt_decref(lov, tgt);
1395                         GOTO(out_set, rc = -ENOMEM);
1396                 }
1397
1398                 req->rq_stripe = i;
1399                 req->rq_idx = loi->loi_ost_idx;
1400                 req->rq_gen = loi->loi_ost_gen;
1401
1402                 req->rq_oa = obdo_alloc();
1403                 if (req->rq_oa == NULL) {
1404                         OBD_FREE(req, sizeof(*req));
1405                         lov_tgt_decref(lov, tgt);
1406                         GOTO(out_set, rc = -ENOMEM);
1407                 }
1408                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1409                 req->rq_oa->o_id = loi->loi_id;
1410                 req->rq_extent.start = rs;
1411                 req->rq_extent.end = re;
1412                 lov_set_add_req(req, set);
1413         }
1414         if (!set->set_count)
1415                 GOTO(out_set, rc = -EIO);
1416         *reqset = set;
1417         RETURN(rc);
1418 out_set:
1419         lov_fini_sync_set(set);
1420         RETURN(rc);
1421 }