Whamcloud - gitweb
b=8007
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <asm/div64.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <linux/obd_class.h>
37 #include <linux/obd_lov.h>
38 #include <linux/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         INIT_LIST_HEAD(&set->set_list);
48         atomic_set(&set->set_refcount, 1);
49 }
50
51 static void lov_finish_set(struct lov_request_set *set)
52 {
53         struct list_head *pos, *n;
54         ENTRY;
55
56         LASSERT(set);
57         list_for_each_safe(pos, n, &set->set_list) {
58                 struct lov_request *req = list_entry(pos, struct lov_request,
59                                                      rq_link);
60                 list_del_init(&req->rq_link);
61
62                 if (req->rq_oa)
63                         obdo_free(req->rq_oa);
64                 if (req->rq_md)
65                         OBD_FREE(req->rq_md, req->rq_buflen);
66                 OBD_FREE(req, sizeof(*req));
67         }
68
69         if (set->set_pga) {
70                 int len = set->set_oabufs * sizeof(*set->set_pga);
71                 OBD_FREE(set->set_pga, len);
72         }
73         if (set->set_lockh)
74                 lov_llh_put(set->set_lockh);
75
76         OBD_FREE(set, sizeof(*set));
77         EXIT;
78 }
79
80 static void lov_update_set(struct lov_request_set *set,
81                            struct lov_request *req, int rc)
82 {
83         req->rq_complete = 1;
84         req->rq_rc = rc;
85
86         set->set_completes++;
87         if (rc == 0)
88                 set->set_success++;
89 }
90
91 int lov_update_common_set(struct lov_request_set *set,
92                           struct lov_request *req, int rc)
93 {
94         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
95         ENTRY;
96
97         lov_update_set(set, req, rc);
98
99         /* grace error on inactive ost */
100         if (rc && !lov->tgts[req->rq_idx].active)
101                 rc = 0;
102
103         /* FIXME in raid1 regime, should return 0 */
104         RETURN(rc);
105 }
106
107 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
108 {
109         list_add_tail(&req->rq_link, &set->set_list);
110         set->set_count++;
111 }
112
113 int lov_update_enqueue_set(struct lov_request_set *set,
114                            struct lov_request *req, int rc, int flags)
115 {
116         struct lustre_handle *lov_lockhp;
117         struct lov_oinfo *loi;
118         ENTRY;
119
120         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
121         loi = &set->set_md->lsm_oinfo[req->rq_stripe];
122
123         /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
124          * It belongs in the OSC, except that the OSC doesn't have
125          * access to the real LOI -- it gets a copy, that we created
126          * above, and that copy can be arbitrarily out of date.
127          *
128          * The LOV API is due for a serious rewriting anyways, and this
129          * can be addressed then. */
130         if (rc == ELDLM_OK) {
131                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
132                 __u64 tmp = req->rq_md->lsm_oinfo->loi_lvb.lvb_size;
133
134                 LASSERT(lock != NULL);
135                 lov_stripe_lock(set->set_md);
136                 loi->loi_lvb = req->rq_md->lsm_oinfo->loi_lvb;
137                 /* Extend KMS up to the end of this lock and no further
138                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
139                 if (tmp > lock->l_policy_data.l_extent.end)
140                         tmp = lock->l_policy_data.l_extent.end + 1;
141                 if (tmp >= loi->loi_kms) {
142                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
143                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
144                         loi->loi_kms = tmp;
145                         loi->loi_kms_valid = 1;
146                 } else {
147                         LDLM_DEBUG(lock, "lock acquired, setting rss="
148                                    LPU64"; leaving kms="LPU64", end="LPU64,
149                                    loi->loi_lvb.lvb_size, loi->loi_kms,
150                                    lock->l_policy_data.l_extent.end);
151                 }
152                 lov_stripe_unlock(set->set_md);
153                 ldlm_lock_allow_match(lock);
154                 LDLM_LOCK_PUT(lock);
155         } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
156                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
157                 lov_stripe_lock(set->set_md);
158                 loi->loi_lvb = req->rq_md->lsm_oinfo->loi_lvb;
159                 lov_stripe_unlock(set->set_md);
160                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
161                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
162                 rc = ELDLM_OK;
163         } else {
164                 struct obd_export *exp = set->set_exp;
165                 struct lov_obd *lov = &exp->exp_obd->u.lov;
166
167                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
168                 if (lov->tgts[req->rq_idx].active) {
169                         CERROR("error: enqueue objid "LPX64" subobj "
170                                 LPX64" on OST idx %d: rc = %d\n",
171                                 set->set_md->lsm_object_id, loi->loi_id,
172                                 loi->loi_ost_idx, rc);
173                 } else {
174                         rc = ELDLM_OK;
175                 }
176         }
177         lov_update_set(set, req, rc);
178         RETURN(rc);
179 }
180
181 static int enqueue_done(struct lov_request_set *set, __u32 mode)
182 {
183         struct lov_request *req;
184         struct lustre_handle *lov_lockhp = NULL;
185         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
186         int rc = 0;
187         ENTRY;
188
189         LASSERT(set->set_completes);
190         /* enqueue/match success, just return */
191         if (set->set_completes == set->set_success)
192                 RETURN(0);
193
194         /* cancel enqueued/matched locks */
195         list_for_each_entry(req, &set->set_list, rq_link) {
196                 if (!req->rq_complete || req->rq_rc)
197                         continue;
198
199                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
200                 LASSERT(lov_lockhp);
201                 if (!lustre_handle_is_used(lov_lockhp))
202                         continue;
203
204                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
205                                 mode, lov_lockhp);
206                 if (rc && lov->tgts[req->rq_idx].active)
207                         CERROR("cancelling obdjid "LPX64" on OST "
208                                "idx %d error: rc = %d\n",
209                                req->rq_md->lsm_object_id, req->rq_idx, rc);
210         }
211         lov_llh_put(set->set_lockh);
212         RETURN(rc);
213 }
214
215 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
216 {
217         int rc = 0;
218         ENTRY;
219
220         if (set == NULL)
221                 RETURN(0);
222         LASSERT(set->set_exp);
223         if (set->set_completes)
224                 rc = enqueue_done(set, mode);
225         else
226                 lov_llh_put(set->set_lockh);
227
228         if (atomic_dec_and_test(&set->set_refcount))
229                 lov_finish_set(set);
230
231         RETURN(rc);
232 }
233
234 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
235                          ldlm_policy_data_t *policy, __u32 mode,
236                          struct lustre_handle *lockh,
237                          struct lov_request_set **reqset)
238 {
239         struct lov_obd *lov = &exp->exp_obd->u.lov;
240         struct lov_request_set *set;
241         int i, rc = 0;
242         struct lov_oinfo *loi;
243         ENTRY;
244
245         OBD_ALLOC(set, sizeof(*set));
246         if (set == NULL)
247                 RETURN(-ENOMEM);
248         lov_init_set(set);
249
250         set->set_exp = exp;
251         set->set_md = lsm;
252         set->set_lockh = lov_llh_new(lsm);
253         if (set->set_lockh == NULL)
254                 GOTO(out_set, rc = -ENOMEM);
255         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
256
257         loi = lsm->lsm_oinfo;
258         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
259                 struct lov_request *req;
260                 obd_off start, end;
261
262                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
263                                            policy->l_extent.end, &start, &end))
264                         continue;
265
266                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
267                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
268                         continue;
269                 }
270
271                 OBD_ALLOC(req, sizeof(*req));
272                 if (req == NULL)
273                         GOTO(out_set, rc = -ENOMEM);
274
275                 req->rq_buflen = sizeof(*req->rq_md) +
276                         sizeof(struct lov_oinfo);
277                 OBD_ALLOC(req->rq_md, req->rq_buflen);
278                 if (req->rq_md == NULL)
279                         GOTO(out_set, rc = -ENOMEM);
280
281                 req->rq_extent.start = start;
282                 req->rq_extent.end = end;
283                 req->rq_extent.gid = policy->l_extent.gid;
284
285                 req->rq_idx = loi->loi_ost_idx;
286                 req->rq_stripe = i;
287
288                 /* XXX LOV STACKING: submd should be from the subobj */
289                 req->rq_md->lsm_object_id = loi->loi_id;
290                 req->rq_md->lsm_stripe_count = 0;
291                 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
292                 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
293                 req->rq_md->lsm_oinfo->loi_lvb = loi->loi_lvb;
294
295                 lov_set_add_req(req, set);
296         }
297         if (!set->set_count)
298                 GOTO(out_set, rc = -EIO);
299         *reqset = set;
300         RETURN(0);
301 out_set:
302         lov_fini_enqueue_set(set, mode);
303         RETURN(rc);
304 }
305
306 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
307                          int rc)
308 {
309         int ret = rc;
310         ENTRY;
311
312         if (rc == 1)
313                 ret = 0;
314         lov_update_set(set, req, ret);
315         RETURN(rc);
316 }
317
318 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
319 {
320         int rc = 0;
321         ENTRY;
322
323         if (set == NULL)
324                 RETURN(0);
325         LASSERT(set->set_exp);
326         if (set->set_completes) {
327                 if (set->set_count == set->set_success &&
328                     flags & LDLM_FL_TEST_LOCK)
329                         lov_llh_put(set->set_lockh);
330                 rc = enqueue_done(set, mode);
331         } else {
332                 lov_llh_put(set->set_lockh);
333         }
334
335         if (atomic_dec_and_test(&set->set_refcount))
336                 lov_finish_set(set);
337
338         RETURN(rc);
339 }
340
341 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
342                        ldlm_policy_data_t *policy, __u32 mode,
343                        struct lustre_handle *lockh,
344                        struct lov_request_set **reqset)
345 {
346         struct lov_obd *lov = &exp->exp_obd->u.lov;
347         struct lov_request_set *set;
348         int i, rc = 0;
349         struct lov_oinfo *loi;
350         ENTRY;
351
352         OBD_ALLOC(set, sizeof(*set));
353         if (set == NULL)
354                 RETURN(-ENOMEM);
355         lov_init_set(set);
356
357         set->set_exp = exp;
358         set->set_md = lsm;
359         set->set_lockh = lov_llh_new(lsm);
360         if (set->set_lockh == NULL)
361                 GOTO(out_set, rc = -ENOMEM);
362         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
363
364         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
365                 struct lov_request *req;
366                 obd_off start, end;
367
368                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
369                                            policy->l_extent.end, &start, &end))
370                         continue;
371
372                 /* FIXME raid1 should grace this error */
373                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
374                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
375                         GOTO(out_set, rc = -EIO);
376                 }
377
378                 OBD_ALLOC(req, sizeof(*req));
379                 if (req == NULL)
380                         GOTO(out_set, rc = -ENOMEM);
381
382                 req->rq_buflen = sizeof(*req->rq_md);
383                 OBD_ALLOC(req->rq_md, req->rq_buflen);
384                 if (req->rq_md == NULL)
385                         GOTO(out_set, rc = -ENOMEM);
386
387                 req->rq_extent.start = start;
388                 req->rq_extent.end = end;
389                 req->rq_extent.gid = policy->l_extent.gid;
390
391                 req->rq_idx = loi->loi_ost_idx;
392                 req->rq_stripe = i;
393
394                 /* XXX LOV STACKING: submd should be from the subobj */
395                 req->rq_md->lsm_object_id = loi->loi_id;
396                 req->rq_md->lsm_stripe_count = 0;
397
398                 lov_set_add_req(req, set);
399         }
400         if (!set->set_count)
401                 GOTO(out_set, rc = -EIO);
402         *reqset = set;
403         RETURN(rc);
404 out_set:
405         lov_fini_match_set(set, mode, 0);
406         RETURN(rc);
407 }
408
409 int lov_fini_cancel_set(struct lov_request_set *set)
410 {
411         int rc = 0;
412         ENTRY;
413
414         if (set == NULL)
415                 RETURN(0);
416
417         LASSERT(set->set_exp);
418         if (set->set_lockh)
419                 lov_llh_put(set->set_lockh);
420
421         if (atomic_dec_and_test(&set->set_refcount))
422                 lov_finish_set(set);
423
424         RETURN(rc);
425 }
426
427 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
428                         __u32 mode, struct lustre_handle *lockh,
429                         struct lov_request_set **reqset)
430 {
431         struct lov_request_set *set;
432         int i, rc = 0;
433         struct lov_oinfo *loi;
434         ENTRY;
435
436         OBD_ALLOC(set, sizeof(*set));
437         if (set == NULL)
438                 RETURN(-ENOMEM);
439         lov_init_set(set);
440
441         set->set_exp = exp;
442         set->set_md = lsm;
443         set->set_lockh = lov_handle2llh(lockh);
444         if (set->set_lockh == NULL) {
445                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
446                 GOTO(out_set, rc = -EINVAL);
447         }
448         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
449
450         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
451                 struct lov_request *req;
452                 struct lustre_handle *lov_lockhp;
453
454                 lov_lockhp = set->set_lockh->llh_handles + i;
455                 if (!lustre_handle_is_used(lov_lockhp)) {
456                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
457                                loi->loi_ost_idx, loi->loi_id);
458                         continue;
459                 }
460
461                 OBD_ALLOC(req, sizeof(*req));
462                 if (req == NULL)
463                         GOTO(out_set, rc = -ENOMEM);
464
465                 req->rq_buflen = sizeof(*req->rq_md);
466                 OBD_ALLOC(req->rq_md, req->rq_buflen);
467                 if (req->rq_md == NULL)
468                         GOTO(out_set, rc = -ENOMEM);
469
470                 req->rq_idx = loi->loi_ost_idx;
471                 req->rq_stripe = i;
472
473                 /* XXX LOV STACKING: submd should be from the subobj */
474                 req->rq_md->lsm_object_id = loi->loi_id;
475                 req->rq_md->lsm_stripe_count = 0;
476
477                 lov_set_add_req(req, set);
478         }
479         if (!set->set_count)
480                 GOTO(out_set, rc = -EIO);
481         *reqset = set;
482         RETURN(rc);
483 out_set:
484         lov_fini_cancel_set(set);
485         RETURN(rc);
486 }
487
488 static int create_done(struct obd_export *exp, struct lov_request_set *set,
489                        struct lov_stripe_md **lsmp)
490 {
491         struct lov_obd *lov = &exp->exp_obd->u.lov;
492         struct obd_trans_info *oti = set->set_oti;
493         struct obdo *src_oa = set->set_oa;
494         struct lov_request *req;
495         struct obdo *ret_oa = NULL;
496         int attrset = 0, rc = 0;
497         ENTRY;
498
499         LASSERT(set->set_completes);
500
501         /* try alloc objects on other osts if osc_create fails for
502          * exceptions: RPC failure, ENOSPC, etc */
503         if (set->set_count != set->set_success) {
504                 list_for_each_entry (req, &set->set_list, rq_link) {
505                         if (req->rq_rc == 0)
506                                 continue;
507                         
508                         set->set_completes--;
509                         req->rq_complete = 0;
510                         
511                         rc = qos_remedy_create(set, req);
512                         lov_update_create_set(set, req, rc);
513
514                         if (rc)
515                                 break;
516                 }
517         }
518
519         /* no successful creates */
520         if (set->set_success == 0)
521                 GOTO(cleanup, rc);
522         
523         /* If there was an explicit stripe set, fail.  Otherwise, we
524          * got some objects and that's not bad. */
525         if (set->set_count != set->set_success) {
526                 if (*lsmp)
527                         GOTO(cleanup, rc);
528                 set->set_count = set->set_success;
529                 qos_shrink_lsm(set);
530         }
531
532         ret_oa = obdo_alloc();
533         if (ret_oa == NULL)
534                 GOTO(cleanup, rc = -ENOMEM);
535
536         list_for_each_entry(req, &set->set_list, rq_link) {
537                 if (!req->rq_complete || req->rq_rc)
538                         continue;
539                 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
540                                 set->set_md, req->rq_stripe, &attrset);
541         }
542         if (src_oa->o_valid & OBD_MD_FLSIZE &&
543             ret_oa->o_size != src_oa->o_size) {
544                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
545                        src_oa->o_size, ret_oa->o_size);
546                 LBUG();
547         }
548         ret_oa->o_id = src_oa->o_id;
549         memcpy(src_oa, ret_oa, sizeof(*src_oa));
550         obdo_free(ret_oa);
551
552         *lsmp = set->set_md;
553         GOTO(done, rc = 0);
554
555 cleanup:
556         list_for_each_entry(req, &set->set_list, rq_link) {
557                 struct obd_export *sub_exp;
558                 int err = 0;
559
560                 if (!req->rq_complete || req->rq_rc)
561                         continue;
562
563                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
564                 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti, NULL);
565                 if (err)
566                         CERROR("Failed to uncreate objid "LPX64" subobj "
567                                LPX64" on OST idx %d: rc = %d\n",
568                                set->set_oa->o_id, req->rq_oa->o_id,
569                                req->rq_idx, rc);
570         }
571         if (*lsmp == NULL)
572                 obd_free_memmd(exp, &set->set_md);
573 done:
574         if (oti && set->set_cookies) {
575                 oti->oti_logcookies = set->set_cookies;
576                 if (!set->set_cookie_sent) {
577                         oti_free_cookies(oti);
578                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
579                 } else {
580                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
581                 }
582         }
583         RETURN(rc);
584 }
585
586 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
587 {
588         int rc = 0;
589         ENTRY;
590
591         if (set == NULL)
592                 RETURN(0);
593         LASSERT(set->set_exp);
594         if (set->set_completes) {
595                 rc = create_done(set->set_exp, set, lsmp);
596                 /* FIXME update qos data here */
597         }
598
599         if (atomic_dec_and_test(&set->set_refcount))
600                 lov_finish_set(set);
601
602         RETURN(rc);
603 }
604
605 int lov_update_create_set(struct lov_request_set *set,
606                           struct lov_request *req, int rc)
607 {
608         struct obd_trans_info *oti = set->set_oti;
609         struct lov_stripe_md *lsm = set->set_md;
610         struct lov_oinfo *loi;
611         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
612         ENTRY;
613
614         req->rq_stripe = set->set_success;
615         loi = &lsm->lsm_oinfo[req->rq_stripe];
616
617         if (rc && lov->tgts[req->rq_idx].active) {
618                 CERROR("error creating fid "LPX64" sub-object"
619                        " on OST idx %d/%d: rc = %d\n",
620                        set->set_oa->o_id, req->rq_idx,
621                        lsm->lsm_stripe_count, rc);
622                 if (rc > 0) {
623                         CERROR("obd_create returned invalid err %d\n", rc);
624                         rc = -EIO;
625                 }
626         }
627         lov_update_set(set, req, rc);
628         if (rc)
629                 RETURN(rc);
630
631         if (oti && oti->oti_objid)
632                 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
633
634         loi->loi_id = req->rq_oa->o_id;
635         loi->loi_ost_idx = req->rq_idx;
636         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
637                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
638         loi_init(loi);
639
640         if (set->set_cookies)
641                 ++oti->oti_logcookies;
642         if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
643                 set->set_cookie_sent++;
644
645         RETURN(0);
646 }
647
648 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **lsmp,
649                         struct obdo *src_oa, struct obd_trans_info *oti,
650                         struct lov_request_set **reqset)
651 {
652         struct lov_obd *lov = &exp->exp_obd->u.lov;
653         struct lov_request_set *set;
654         int rc = 0, newea = 0;
655         ENTRY;
656
657         OBD_ALLOC(set, sizeof(*set));
658         if (set == NULL)
659                 RETURN(-ENOMEM);
660         lov_init_set(set);
661
662         set->set_exp = exp;
663         set->set_md = *lsmp;
664         set->set_oa = src_oa;
665         set->set_oti = oti;
666
667         if (set->set_md == NULL) {
668                 int stripes, stripe_cnt;
669                 stripe_cnt = lov_get_stripecnt(lov, 0);
670
671                 /* If the MDS file was truncated up to some size, stripe over
672                  * enough OSTs to allow the file to be created at that size. */
673                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
674                         stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
675                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
676
677                         if (stripes > lov->desc.ld_active_tgt_count)
678                                 GOTO(out_set, rc = -EFBIG);
679                         if (stripes < stripe_cnt)
680                                 stripes = stripe_cnt;
681                 } else {
682                         stripes = stripe_cnt;
683                 }
684
685                 rc = lov_alloc_memmd(&set->set_md, stripes,
686                                      lov->desc.ld_pattern ?
687                                      lov->desc.ld_pattern : LOV_PATTERN_RAID0, 
688                                      LOV_MAGIC);
689                 if (rc < 0)
690                         goto out_set;
691                 newea = 1;
692         }
693
694         rc = qos_prep_create(lov, set, newea);
695         if (rc)
696                 goto out_lsm;
697
698         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
699                 oti_alloc_cookies(oti, set->set_count);
700                 if (!oti->oti_logcookies)
701                         goto out_lsm;
702                 set->set_cookies = oti->oti_logcookies;
703         }
704         *reqset = set;
705         RETURN(rc);
706
707 out_lsm:
708         if (*lsmp == NULL)
709                 obd_free_memmd(exp, &set->set_md);
710 out_set:
711         lov_fini_create_set(set, lsmp);
712         RETURN(rc);
713 }
714
715 static int common_attr_done(struct lov_request_set *set)
716 {
717         struct list_head *pos;
718         struct lov_request *req;
719         struct obdo *tmp_oa;
720         int rc = 0, attrset = 0;
721         ENTRY;
722
723         if (set->set_oa == NULL)
724                 RETURN(0);
725
726         if (!set->set_success)
727                 RETURN(-EIO);
728
729         tmp_oa = obdo_alloc();
730         if (tmp_oa == NULL)
731                 GOTO(out, rc = -ENOMEM);
732
733         list_for_each (pos, &set->set_list) {
734                 req = list_entry(pos, struct lov_request, rq_link);
735
736                 if (!req->rq_complete || req->rq_rc)
737                         continue;
738                 if (req->rq_oa->o_valid == 0)   /* inactive stripe */
739                         continue;
740                 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
741                                 set->set_md, req->rq_stripe, &attrset);
742         }
743         if (!attrset) {
744                 CERROR("No stripes had valid attrs\n");
745                 rc = -EIO;
746         }
747         tmp_oa->o_id = set->set_oa->o_id;
748         memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
749 out:
750         if (tmp_oa)
751                 obdo_free(tmp_oa);
752         RETURN(rc);
753
754 }
755
756 static int brw_done(struct lov_request_set *set)
757 {
758         struct lov_stripe_md *lsm = set->set_md;
759         struct lov_oinfo     *loi = NULL;
760         struct list_head *pos;
761         struct lov_request *req;
762         ENTRY;
763
764         list_for_each (pos, &set->set_list) {
765                 req = list_entry(pos, struct lov_request, rq_link);
766
767                 if (!req->rq_complete || req->rq_rc)
768                         continue;
769
770                 loi = &lsm->lsm_oinfo[req->rq_stripe];
771
772                 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
773                         loi->loi_lvb.lvb_blocks = req->rq_oa->o_blocks;
774         }
775
776         RETURN(0);
777 }
778
779 int lov_fini_brw_set(struct lov_request_set *set)
780 {
781         int rc = 0;
782         ENTRY;
783
784         if (set == NULL)
785                 RETURN(0);
786         LASSERT(set->set_exp);
787         if (set->set_completes) {
788                 rc = brw_done(set);
789                 /* FIXME update qos data here */
790         }
791         if (atomic_dec_and_test(&set->set_refcount))
792                 lov_finish_set(set);
793
794         RETURN(rc);
795 }
796
797 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
798                      struct lov_stripe_md *lsm, obd_count oa_bufs,
799                      struct brw_page *pga, struct obd_trans_info *oti,
800                      struct lov_request_set **reqset)
801 {
802         struct {
803                 obd_count       index;
804                 obd_count       count;
805                 obd_count       off;
806         } *info = NULL;
807         struct lov_request_set *set;
808         struct lov_oinfo *loi = NULL;
809         struct lov_obd *lov = &exp->exp_obd->u.lov;
810         int rc = 0, i, shift;
811         ENTRY;
812
813         OBD_ALLOC(set, sizeof(*set));
814         if (set == NULL)
815                 RETURN(-ENOMEM);
816         lov_init_set(set);
817
818         set->set_exp = exp;
819         set->set_md = lsm;
820         set->set_oa = src_oa;
821         set->set_oti = oti;
822         set->set_oabufs = oa_bufs;
823         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
824         if (!set->set_pga)
825                 GOTO(out, rc = -ENOMEM);
826
827         OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
828         if (!info)
829                 GOTO(out, rc = -ENOMEM);
830
831         /* calculate the page count for each stripe */
832         for (i = 0; i < oa_bufs; i++) {
833                 int stripe = lov_stripe_number(lsm, pga[i].off);
834                 info[stripe].count++;
835         }
836
837         /* alloc and initialize lov request */
838         shift = 0;
839         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
840                 struct lov_request *req;
841
842                 if (info[i].count == 0)
843                         continue;
844
845                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
846                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
847                         GOTO(out, rc = -EIO);
848                 }
849
850                 OBD_ALLOC(req, sizeof(*req));
851                 if (req == NULL)
852                         GOTO(out, rc = -ENOMEM);
853
854                 req->rq_oa = obdo_alloc();
855                 if (req->rq_oa == NULL)
856                         GOTO(out, rc = -ENOMEM);
857
858                 if (src_oa)
859                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
860                 req->rq_oa->o_id = loi->loi_id;
861                 req->rq_oa->o_stripe_idx = i;
862
863                 req->rq_buflen = sizeof(*req->rq_md);
864                 OBD_ALLOC(req->rq_md, req->rq_buflen);
865                 if (req->rq_md == NULL)
866                         GOTO(out, rc = -ENOMEM);
867
868                 req->rq_idx = loi->loi_ost_idx;
869                 req->rq_stripe = i;
870
871                 /* XXX LOV STACKING */
872                 req->rq_md->lsm_object_id = loi->loi_id;
873                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
874                 req->rq_oabufs = info[i].count;
875                 req->rq_pgaidx = shift;
876                 shift += req->rq_oabufs;
877
878                 /* remember the index for sort brw_page array */
879                 info[i].index = req->rq_pgaidx;
880
881                 lov_set_add_req(req, set);
882         }
883         if (!set->set_count)
884                 GOTO(out, rc = -EIO);
885
886         /* rotate & sort the brw_page array */
887         for (i = 0; i < oa_bufs; i++) {
888                 int stripe = lov_stripe_number(lsm, pga[i].off);
889
890                 shift = info[stripe].index + info[stripe].off;
891                 LASSERT(shift < oa_bufs);
892                 set->set_pga[shift] = pga[i];
893                 lov_stripe_offset(lsm, pga[i].off, stripe,
894                                   &set->set_pga[shift].off);
895                 info[stripe].off++;
896         }
897 out:
898         if (info)
899                 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
900
901         if (rc == 0)
902                 *reqset = set;
903         else
904                 lov_fini_brw_set(set);
905
906         RETURN(rc);
907 }
908
909 int lov_fini_getattr_set(struct lov_request_set *set)
910 {
911         int rc = 0;
912         ENTRY;
913
914         if (set == NULL)
915                 RETURN(0);
916         LASSERT(set->set_exp);
917         if (set->set_completes)
918                 rc = common_attr_done(set);
919
920         if (atomic_dec_and_test(&set->set_refcount))
921                 lov_finish_set(set);
922
923         RETURN(rc);
924 }
925
926 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
927                          struct lov_stripe_md *lsm,
928                          struct lov_request_set **reqset)
929 {
930         struct lov_request_set *set;
931         struct lov_oinfo *loi = NULL;
932         struct lov_obd *lov = &exp->exp_obd->u.lov;
933         int rc = 0, i;
934         ENTRY;
935
936         OBD_ALLOC(set, sizeof(*set));
937         if (set == NULL)
938                 RETURN(-ENOMEM);
939         lov_init_set(set);
940
941         set->set_exp = exp;
942         set->set_md = lsm;
943         set->set_oa = src_oa;
944
945         loi = lsm->lsm_oinfo;
946         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
947                 struct lov_request *req;
948
949                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
950                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
951                         continue;
952                 }
953
954                 OBD_ALLOC(req, sizeof(*req));
955                 if (req == NULL)
956                         GOTO(out_set, rc = -ENOMEM);
957
958                 req->rq_stripe = i;
959                 req->rq_idx = loi->loi_ost_idx;
960
961                 req->rq_oa = obdo_alloc();
962                 if (req->rq_oa == NULL)
963                         GOTO(out_set, rc = -ENOMEM);
964                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
965                 req->rq_oa->o_id = loi->loi_id;
966
967                 lov_set_add_req(req, set);
968         }
969         if (!set->set_count)
970                 GOTO(out_set, rc = -EIO);
971         *reqset = set;
972         RETURN(rc);
973 out_set:
974         lov_fini_getattr_set(set);
975         RETURN(rc);
976 }
977
978 int lov_fini_destroy_set(struct lov_request_set *set)
979 {
980         ENTRY;
981
982         if (set == NULL)
983                 RETURN(0);
984         LASSERT(set->set_exp);
985         if (set->set_completes) {
986                 /* FIXME update qos data here */
987         }
988
989         if (atomic_dec_and_test(&set->set_refcount))
990                 lov_finish_set(set);
991
992         RETURN(0);
993 }
994
995 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
996                          struct lov_stripe_md *lsm,
997                          struct obd_trans_info *oti,
998                          struct lov_request_set **reqset)
999 {
1000         struct lov_request_set *set;
1001         struct lov_oinfo *loi = NULL;
1002         struct lov_obd *lov = &exp->exp_obd->u.lov;
1003         int rc = 0, cookie_set = 0, i;
1004         ENTRY;
1005
1006         OBD_ALLOC(set, sizeof(*set));
1007         if (set == NULL)
1008                 RETURN(-ENOMEM);
1009         lov_init_set(set);
1010
1011         set->set_exp = exp;
1012         set->set_md = lsm;
1013         set->set_oa = src_oa;
1014         set->set_oti = oti;
1015         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1016                 set->set_cookies = oti->oti_logcookies;
1017
1018         loi = lsm->lsm_oinfo;
1019         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1020                 struct lov_request *req;
1021
1022                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1023                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1024                         continue;
1025                 }
1026
1027                 OBD_ALLOC(req, sizeof(*req));
1028                 if (req == NULL)
1029                         GOTO(out_set, rc = -ENOMEM);
1030
1031                 req->rq_stripe = i;
1032                 req->rq_idx = loi->loi_ost_idx;
1033
1034                 req->rq_oa = obdo_alloc();
1035                 if (req->rq_oa == NULL)
1036                         GOTO(out_set, rc = -ENOMEM);
1037                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1038                 req->rq_oa->o_id = loi->loi_id;
1039
1040                 /* Setup the first request's cookie position */
1041                 if (!cookie_set && set->set_cookies) {
1042                         oti->oti_logcookies = set->set_cookies + i;
1043                         cookie_set = 1;
1044                 }
1045                 lov_set_add_req(req, set);
1046         }
1047         if (!set->set_count)
1048                 GOTO(out_set, rc = -EIO);
1049         *reqset = set;
1050         RETURN(rc);
1051 out_set:
1052         lov_fini_destroy_set(set);
1053         RETURN(rc);
1054 }
1055
1056 int lov_fini_setattr_set(struct lov_request_set *set)
1057 {
1058         int rc = 0;
1059         ENTRY;
1060
1061         if (set == NULL)
1062                 RETURN(0);
1063         LASSERT(set->set_exp);
1064         if (set->set_completes) {
1065                 rc = common_attr_done(set);
1066                 /* FIXME update qos data here */
1067         }
1068
1069         if (atomic_dec_and_test(&set->set_refcount))
1070                 lov_finish_set(set);
1071         RETURN(rc);
1072 }
1073
1074 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1075                          struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1076                          struct lov_request_set **reqset)
1077 {
1078         struct lov_request_set *set;
1079         struct lov_oinfo *loi = NULL;
1080         struct lov_obd *lov = &exp->exp_obd->u.lov;
1081         int rc = 0, i;
1082         ENTRY;
1083
1084         OBD_ALLOC(set, sizeof(*set));
1085         if (set == NULL)
1086                 RETURN(-ENOMEM);
1087         lov_init_set(set);
1088
1089         set->set_exp = exp;
1090         set->set_md = lsm;
1091         set->set_oa = src_oa;
1092
1093         loi = lsm->lsm_oinfo;
1094         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1095                 struct lov_request *req;
1096
1097                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1098                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1099                         continue;
1100                 }
1101
1102                 OBD_ALLOC(req, sizeof(*req));
1103                 if (req == NULL)
1104                         GOTO(out_set, rc = -ENOMEM);
1105                 req->rq_stripe = i;
1106                 req->rq_idx = loi->loi_ost_idx;
1107
1108                 req->rq_oa = obdo_alloc();
1109                 if (req->rq_oa == NULL)
1110                         GOTO(out_set, rc = -ENOMEM);
1111                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1112                 req->rq_oa->o_id = loi->loi_id;
1113                 req->rq_oa->o_stripe_idx = i;
1114
1115                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1116                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
1117                                               &req->rq_oa->o_size) < 0 &&
1118                             req->rq_oa->o_size)
1119                                 req->rq_oa->o_size--;
1120                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1121                                i, req->rq_oa->o_size, src_oa->o_size);
1122                 }
1123                 lov_set_add_req(req, set);
1124         }
1125         if (!set->set_count)
1126                 GOTO(out_set, rc = -EIO);
1127         *reqset = set;
1128         RETURN(rc);
1129 out_set:
1130         lov_fini_setattr_set(set);
1131         RETURN(rc);
1132 }
1133
1134 int lov_update_setattr_set(struct lov_request_set *set,
1135                            struct lov_request *req, int rc)
1136 {
1137         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1138         struct lov_stripe_md *lsm = set->set_md;
1139         ENTRY;
1140
1141         lov_update_set(set, req, rc);
1142
1143         /* grace error on inactive ost */
1144         if (rc && !lov->tgts[req->rq_idx].active)
1145                 rc = 0;
1146
1147         /* FIXME: LOV STACKING update loi data should be done by OSC *
1148          * when this is gone we can go back to using lov_update_common_set() */
1149         if (rc == 0) {
1150                 if (req->rq_oa->o_valid & OBD_MD_FLMTIME)
1151                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_ctime =
1152                                 req->rq_oa->o_ctime;
1153                 if (req->rq_oa->o_valid & OBD_MD_FLMTIME)
1154                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_mtime =
1155                                 req->rq_oa->o_mtime;
1156                 if (req->rq_oa->o_valid & OBD_MD_FLATIME)
1157                         lsm->lsm_oinfo[req->rq_stripe].loi_lvb.lvb_atime =
1158                                 req->rq_oa->o_atime;
1159         }
1160
1161         RETURN(rc);
1162 }
1163
1164 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1165                          int rc)
1166 {
1167         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1168         ENTRY;
1169
1170         lov_update_set(set, req, rc);
1171         if (rc && !lov->tgts[req->rq_idx].active)
1172                 rc = 0;
1173         /* FIXME in raid1 regime, should return 0 */
1174         RETURN(rc);
1175 }
1176
1177 int lov_fini_punch_set(struct lov_request_set *set)
1178 {
1179         int rc = 0;
1180         ENTRY;
1181
1182         if (set == NULL)
1183                 RETURN(0);
1184         LASSERT(set->set_exp);
1185         if (set->set_completes) {
1186                 if (!set->set_success)
1187                         rc = -EIO;
1188                 /* FIXME update qos data here */
1189         }
1190
1191         if (atomic_dec_and_test(&set->set_refcount))
1192                 lov_finish_set(set);
1193
1194         RETURN(rc);
1195 }
1196
1197 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1198                        struct lov_stripe_md *lsm, obd_off start,
1199                        obd_off end, struct obd_trans_info *oti,
1200                        struct lov_request_set **reqset)
1201 {
1202         struct lov_request_set *set;
1203         struct lov_oinfo *loi = NULL;
1204         struct lov_obd *lov = &exp->exp_obd->u.lov;
1205         int rc = 0, i;
1206         ENTRY;
1207
1208         OBD_ALLOC(set, sizeof(*set));
1209         if (set == NULL)
1210                 RETURN(-ENOMEM);
1211         lov_init_set(set);
1212
1213         set->set_exp = exp;
1214         set->set_md = lsm;
1215         set->set_oa = src_oa;
1216
1217         loi = lsm->lsm_oinfo;
1218         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1219                 struct lov_request *req;
1220                 obd_off rs, re;
1221
1222                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1223                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1224                         continue;
1225                 }
1226
1227                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1228                         continue;
1229
1230                 OBD_ALLOC(req, sizeof(*req));
1231                 if (req == NULL)
1232                         GOTO(out_set, rc = -ENOMEM);
1233                 req->rq_stripe = i;
1234                 req->rq_idx = loi->loi_ost_idx;
1235
1236                 req->rq_oa = obdo_alloc();
1237                 if (req->rq_oa == NULL)
1238                         GOTO(out_set, rc = -ENOMEM);
1239                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1240                 req->rq_oa->o_id = loi->loi_id;
1241                 req->rq_oa->o_stripe_idx = i;
1242
1243                 req->rq_extent.start = rs;
1244                 req->rq_extent.end = re;
1245                 req->rq_extent.gid = -1;
1246
1247                 lov_set_add_req(req, set);
1248         }
1249         if (!set->set_count)
1250                 GOTO(out_set, rc = -EIO);
1251         *reqset = set;
1252         RETURN(rc);
1253 out_set:
1254         lov_fini_punch_set(set);
1255         RETURN(rc);
1256 }
1257
1258 int lov_fini_sync_set(struct lov_request_set *set)
1259 {
1260         int rc = 0;
1261         ENTRY;
1262
1263         if (set == NULL)
1264                 RETURN(0);
1265         LASSERT(set->set_exp);
1266         if (set->set_completes) {
1267                 if (!set->set_success)
1268                         rc = -EIO;
1269                 /* FIXME update qos data here */
1270         }
1271
1272         if (atomic_dec_and_test(&set->set_refcount))
1273                 lov_finish_set(set);
1274
1275         RETURN(rc);
1276 }
1277
1278 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1279                       struct lov_stripe_md *lsm, obd_off start,
1280                       obd_off end, struct lov_request_set **reqset)
1281 {
1282         struct lov_request_set *set;
1283         struct lov_oinfo *loi = NULL;
1284         struct lov_obd *lov = &exp->exp_obd->u.lov;
1285         int rc = 0, i;
1286         ENTRY;
1287
1288         OBD_ALLOC(set, sizeof(*set));
1289         if (set == NULL)
1290                 RETURN(-ENOMEM);
1291         lov_init_set(set);
1292
1293         set->set_exp = exp;
1294         set->set_md = lsm;
1295         set->set_oa = src_oa;
1296
1297         loi = lsm->lsm_oinfo;
1298         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1299                 struct lov_request *req;
1300                 obd_off rs, re;
1301
1302                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1303                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1304                         continue;
1305                 }
1306
1307                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1308                         continue;
1309
1310                 OBD_ALLOC(req, sizeof(*req));
1311                 if (req == NULL)
1312                         GOTO(out_set, rc = -ENOMEM);
1313                 req->rq_stripe = i;
1314                 req->rq_idx = loi->loi_ost_idx;
1315
1316                 req->rq_oa = obdo_alloc();
1317                 if (req->rq_oa == NULL)
1318                         GOTO(out_set, rc = -ENOMEM);
1319                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1320                 req->rq_oa->o_id = loi->loi_id;
1321                 req->rq_oa->o_stripe_idx = i;
1322
1323                 req->rq_extent.start = rs;
1324                 req->rq_extent.end = re;
1325                 req->rq_extent.gid = -1;
1326
1327                 lov_set_add_req(req, set);
1328         }
1329         if (!set->set_count)
1330                 GOTO(out_set, rc = -EIO);
1331         *reqset = set;
1332         RETURN(rc);
1333 out_set:
1334         lov_fini_sync_set(set);
1335         RETURN(rc);
1336 }