Whamcloud - gitweb
d5eac9b47cb9beefde07c5b7140c59d22917bde3
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26
27 #ifdef __KERNEL__
28 #include <asm/div64.h>
29 #else
30 #include <liblustre.h>
31 #endif
32
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
36
37 #include "lov_internal.h"
38
39 static void lov_init_set(struct lov_request_set *set)
40 {
41         set->set_count = 0;
42         set->set_completes = 0;
43         set->set_success = 0;
44         INIT_LIST_HEAD(&set->set_list);
45         atomic_set(&set->set_refcount, 1);
46 }
47
48 static void lov_finish_set(struct lov_request_set *set)
49 {
50         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
51         struct list_head *pos, *n;
52         ENTRY;
53
54         LASSERT(set);
55         list_for_each_safe(pos, n, &set->set_list) {
56                 struct lov_request *req;
57                 struct lov_tgt_desc *tgt;
58
59                 req = list_entry(pos, struct lov_request, rq_link);
60                 LASSERT(req->rq_idx >= 0);
61
62                 tgt = lov->tgts + req->rq_idx;
63                 lov_tgt_decref(lov, tgt);
64
65                 list_del_init(&req->rq_link);
66
67                 if (req->rq_oa)
68                         obdo_free(req->rq_oa);
69                 if (req->rq_md)
70                         OBD_FREE(req->rq_md, req->rq_buflen);
71                 OBD_FREE(req, sizeof(*req));
72         }
73
74         if (set->set_pga) {
75                 int len = set->set_oabufs * sizeof(*set->set_pga);
76                 OBD_FREE(set->set_pga, len);
77         }
78         if (set->set_lockh)
79                 lov_llh_put(set->set_lockh);
80
81         OBD_FREE(set, sizeof(*set));
82         EXIT;
83 }
84
85 static void lov_update_set(struct lov_request_set *set,
86                            struct lov_request *req, int rc)
87 {
88         req->rq_complete = 1;
89         req->rq_rc = rc;
90
91         set->set_completes++;
92         if (rc == 0)
93                 set->set_success++;
94 }
95
96 int lov_update_common_set(struct lov_request_set *set,
97                           struct lov_request *req, int rc)
98 {
99         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
100         ENTRY;
101
102         lov_update_set(set, req, rc);
103
104         /* grace error on inactive ost */
105         if (rc) {
106                 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
107
108                 if (lov_tgt_active(lov, tgt, req->rq_gen))
109                         lov_tgt_decref(lov, tgt);
110                 else
111                         rc = 0;
112         }
113
114         /* FIXME in raid1 regime, should return 0 */
115         RETURN(rc);
116 }
117
118 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
119 {
120         list_add_tail(&req->rq_link, &set->set_list);
121         set->set_count++;
122 }
123
124 int lov_update_enqueue_set(struct lov_request_set *set,
125                            struct lov_request *req, int rc, int flags)
126 {
127         struct lustre_handle *lov_lockhp;
128         struct lov_oinfo *loi;
129         ENTRY;
130
131         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
132         loi = &set->set_md->lsm_oinfo[req->rq_stripe];
133
134         /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
135          * It belongs in the OSC, except that the OSC doesn't have
136          * access to the real LOI -- it gets a copy, that we created
137          * above, and that copy can be arbitrarily out of date.
138          *
139          * The LOV API is due for a serious rewriting anyways, and this
140          * can be addressed then. */
141         if (rc == ELDLM_OK) {
142                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
143                 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
144
145                 LASSERT(lock != NULL);
146                 loi->loi_rss = tmp;
147                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
148                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
149                 /* Extend KMS up to the end of this lock and no further
150                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
151                 if (tmp > lock->l_policy_data.l_extent.end)
152                         tmp = lock->l_policy_data.l_extent.end + 1;
153                 if (tmp >= loi->loi_kms) {
154                         CDEBUG(D_INODE, "lock acquired, setting rss="
155                                LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
156                         loi->loi_kms = tmp;
157                         loi->loi_kms_valid = 1;
158                 } else {
159                         CDEBUG(D_INODE, "lock acquired, setting rss="
160                                LPU64"; leaving kms="LPU64", end="LPU64
161                                "\n", loi->loi_rss, loi->loi_kms,
162                                lock->l_policy_data.l_extent.end);
163                 }
164                 ldlm_lock_allow_match(lock);
165                 LDLM_LOCK_PUT(lock);
166         } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
167                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
168                 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
169                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
170                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
171                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
172                        " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
173                 rc = ELDLM_OK;
174         } else {
175                 struct obd_export *exp = set->set_exp;
176                 struct lov_obd *lov = &exp->exp_obd->u.lov;
177                 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
178
179                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
180                 if (lov_tgt_ready(lov, tgt, req->rq_gen)) {
181                         lov_tgt_decref(lov, tgt);
182                         CERROR("error: enqueue objid "LPX64" subobj "
183                                 LPX64" on OST idx %d: rc = %d\n",
184                                 set->set_md->lsm_object_id, loi->loi_id,
185                                 loi->loi_ost_idx, rc);
186                 } else {
187                         CERROR("error: enqueue objid "LPX64" subobj "
188                                 LPX64" on OST idx %d: rc = %d, NOT ACTIVE !\n",
189                                 set->set_md->lsm_object_id, loi->loi_id,
190                                 loi->loi_ost_idx, rc);
191                         rc = ELDLM_OK;
192                 }
193         }
194         lov_update_set(set, req, rc);
195         RETURN(rc);
196 }
197
198 static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
199 {
200         struct list_head *pos;
201         struct lov_request *req;
202         struct lustre_handle *lov_lockhp = NULL;
203         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
204         struct lov_tgt_desc *tgt;
205         int rc = 0;
206         ENTRY;
207
208         LASSERT(set->set_completes);
209         /* enqueue/match success, just return */
210         if (set->set_completes == set->set_success) {
211                 if (flags & LDLM_FL_TEST_LOCK)
212                         lov_llh_put(set->set_lockh);
213                 RETURN(0);
214         }
215
216         /* cancel enqueued/matched locks */
217         list_for_each (pos, &set->set_list) {
218                 req = list_entry(pos, struct lov_request, rq_link);
219
220                 if (!req->rq_complete || req->rq_rc)
221                         continue;
222                 if (flags & LDLM_FL_TEST_LOCK)
223                         continue;
224
225                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
226                 LASSERT(lov_lockhp);
227                 if (lov_lockhp->cookie == 0)
228                         continue;
229
230                 tgt = lov->tgts + req->rq_idx;
231                 rc = obd_cancel(tgt->ltd_exp, req->rq_md, mode, lov_lockhp);
232                 if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
233                         lov_tgt_decref(lov, tgt);
234                         CERROR("cancelling obdjid "LPX64" on OST "
235                                "idx %d error: rc = %d\n",
236                                req->rq_md->lsm_object_id, req->rq_idx, rc);
237                 }
238         }
239         lov_llh_put(set->set_lockh);
240         RETURN(rc);
241 }
242
243 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
244 {
245         int rc = 0;
246         ENTRY;
247
248         LASSERT(set->set_exp);
249         if (set == NULL)
250                 RETURN(0);
251         if (set->set_completes)
252                 rc = enqueue_done(set, mode, 0);
253         else
254                 lov_llh_put(set->set_lockh);
255
256         if (atomic_dec_and_test(&set->set_refcount))
257                 lov_finish_set(set);
258
259         RETURN(rc);
260 }
261
262 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
263                          ldlm_policy_data_t *policy, __u32 mode,
264                          struct lustre_handle *lockh,
265                          struct lov_request_set **reqset)
266 {
267         struct lov_obd *lov = &exp->exp_obd->u.lov;
268         struct lov_request_set *set;
269         int i, rc = 0;
270         struct lov_oinfo *loi;
271         ENTRY;
272
273         OBD_ALLOC(set, sizeof(*set));
274         if (set == NULL)
275                 RETURN(-ENOMEM);
276         lov_init_set(set);
277
278         set->set_exp = exp;
279         set->set_md = lsm;
280         set->set_lockh = lov_llh_new(lsm);
281         if (set->set_lockh == NULL)
282                 GOTO(out_set, rc = -ENOMEM);
283         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
284
285         loi = lsm->lsm_oinfo;
286         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
287                 struct lov_tgt_desc *tgt;
288                 struct lov_request *req;
289                 obd_off start, end;
290
291                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
292                                            policy->l_extent.end, &start, &end))
293                         continue;
294
295                 tgt = lov->tgts + loi->loi_ost_idx;
296                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
297                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
298                         continue;
299                 }
300
301                 OBD_ALLOC(req, sizeof(*req));
302                 if (req == NULL) {
303                         lov_tgt_decref(lov, tgt);
304                         GOTO(out_set, rc = -ENOMEM);
305                 }
306
307                 req->rq_buflen = sizeof(*req->rq_md) +
308                         sizeof(struct lov_oinfo);
309                 OBD_ALLOC(req->rq_md, req->rq_buflen);
310                 if (req->rq_md == NULL) {
311                         OBD_FREE(req, sizeof(*req));
312                         lov_tgt_decref(lov, tgt);
313                         GOTO(out_set, rc = -ENOMEM);
314                 }
315
316                 req->rq_extent.start = start;
317                 req->rq_extent.end = end;
318                 req->rq_extent.gid = policy->l_extent.gid;
319
320                 req->rq_idx = loi->loi_ost_idx;
321                 req->rq_gen = loi->loi_ost_gen;
322                 req->rq_stripe = i;
323
324                 /* XXX LOV STACKING: submd should be from the subobj */
325                 req->rq_md->lsm_object_id = loi->loi_id;
326                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
327                 req->rq_md->lsm_stripe_count = 0;
328                 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
329                 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
330                 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
331                 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
332                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
333
334                 lov_set_add_req(req, set);
335         }
336         if (!set->set_count)
337                 GOTO(out_set, rc = -EIO);
338         *reqset = set;
339         RETURN(0);
340 out_set:
341         lov_fini_enqueue_set(set, mode);
342         RETURN(rc);
343 }
344
345 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
346                          int rc)
347 {
348         ENTRY;
349         lov_update_set(set, req, !rc);
350         RETURN(rc);
351 }
352
353 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
354 {
355         int rc = 0;
356         ENTRY;
357
358         LASSERT(set->set_exp);
359         if (set == NULL)
360                 RETURN(0);
361         if (set->set_completes)
362                 rc = enqueue_done(set, mode, flags);
363         else
364                 lov_llh_put(set->set_lockh);
365
366         if (atomic_dec_and_test(&set->set_refcount))
367                 lov_finish_set(set);
368
369         RETURN(rc);
370 }
371
372 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
373                        ldlm_policy_data_t *policy, __u32 mode,
374                        struct lustre_handle *lockh,
375                        struct lov_request_set **reqset)
376 {
377         struct lov_obd *lov = &exp->exp_obd->u.lov;
378         struct lov_request_set *set;
379         int i, rc = 0;
380         struct lov_oinfo *loi;
381         ENTRY;
382
383         OBD_ALLOC(set, sizeof(*set));
384         if (set == NULL)
385                 RETURN(-ENOMEM);
386         lov_init_set(set);
387
388         set->set_exp = exp;
389         set->set_md = lsm;
390         set->set_lockh = lov_llh_new(lsm);
391         if (set->set_lockh == NULL)
392                 GOTO(out_set, rc = -ENOMEM);
393         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
394
395         loi = lsm->lsm_oinfo;
396         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
397                 struct lov_tgt_desc *tgt;
398                 struct lov_request *req;
399                 obd_off start, end;
400
401                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
402                                            policy->l_extent.end, &start, &end))
403                         continue;
404
405                 /* FIXME raid1 should grace this error */
406                 tgt = lov->tgts + loi->loi_ost_idx;
407                 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
408                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
409                         GOTO(out_set, rc = -EIO);
410                 }
411
412                 OBD_ALLOC(req, sizeof(*req));
413                 if (req == NULL) {
414                         lov_tgt_decref(lov, tgt);
415                         GOTO(out_set, rc = -ENOMEM);
416                 }
417
418                 req->rq_buflen = sizeof(*req->rq_md);
419                 OBD_ALLOC(req->rq_md, req->rq_buflen);
420                 if (req->rq_md == NULL) {
421                         OBD_FREE(req, sizeof(*req));
422                         lov_tgt_decref(lov, tgt);
423                         GOTO(out_set, rc = -ENOMEM);
424                 }
425
426                 req->rq_extent.start = start;
427                 req->rq_extent.end = end;
428                 req->rq_extent.gid = policy->l_extent.gid;
429
430                 req->rq_idx = loi->loi_ost_idx;
431                 req->rq_gen = loi->loi_ost_gen;
432                 req->rq_stripe = i;
433
434                 /* XXX LOV STACKING: submd should be from the subobj */
435                 req->rq_md->lsm_object_id = loi->loi_id;
436                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
437                 req->rq_md->lsm_stripe_count = 0;
438                 lov_set_add_req(req, set);
439         }
440         if (!set->set_count)
441                 GOTO(out_set, rc = -EIO);
442         *reqset = set;
443         RETURN(rc);
444 out_set:
445         lov_fini_match_set(set, mode, 0);
446         RETURN(rc);
447 }
448
449 int lov_fini_cancel_set(struct lov_request_set *set)
450 {
451         int rc = 0;
452         ENTRY;
453
454         LASSERT(set->set_exp);
455         if (set == NULL)
456                 RETURN(0);
457
458         if (set->set_lockh)
459                 lov_llh_put(set->set_lockh);
460
461         if (atomic_dec_and_test(&set->set_refcount))
462                 lov_finish_set(set);
463
464         RETURN(rc);
465 }
466
467 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
468                         __u32 mode, struct lustre_handle *lockh,
469                         struct lov_request_set **reqset)
470 {
471         struct lov_obd *lov = &exp->exp_obd->u.lov;
472         struct lov_request_set *set;
473         int i, rc = 0;
474         struct lov_oinfo *loi;
475         ENTRY;
476
477         OBD_ALLOC(set, sizeof(*set));
478         if (set == NULL)
479                 RETURN(-ENOMEM);
480         lov_init_set(set);
481
482         set->set_exp = exp;
483         set->set_md = lsm;
484         set->set_lockh = lov_handle2llh(lockh);
485         if (set->set_lockh == NULL) {
486                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
487                 GOTO(out_set, rc = -EINVAL);
488         }
489         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
490
491         loi = lsm->lsm_oinfo;
492         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
493                 struct lov_tgt_desc *tgt;
494                 struct lov_request *req;
495                 struct lustre_handle *lov_lockhp;
496
497                 lov_lockhp = set->set_lockh->llh_handles + i;
498                 if (lov_lockhp->cookie == 0) {
499                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
500                                loi->loi_ost_idx, loi->loi_id);
501                         continue;
502                 }
503
504                 tgt = lov->tgts + loi->loi_ost_idx;
505                 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
506                         CERROR("lov idx %d subobj "LPX64" osc inactive?\n",
507                                loi->loi_ost_idx, loi->loi_id);
508                         continue;
509                 }
510
511                 OBD_ALLOC(req, sizeof(*req));
512                 if (req == NULL) {
513                         lov_tgt_decref(lov, tgt);
514                         GOTO(out_set, rc = -ENOMEM);
515                 }
516
517                 req->rq_buflen = sizeof(*req->rq_md);
518                 OBD_ALLOC(req->rq_md, req->rq_buflen);
519                 if (req->rq_md == NULL) {
520                         OBD_FREE(req, sizeof(*req));
521                         lov_tgt_decref(lov, tgt);
522                         GOTO(out_set, rc = -ENOMEM);
523                 }
524
525                 req->rq_idx = loi->loi_ost_idx;
526                 req->rq_gen = loi->loi_ost_gen;
527                 req->rq_stripe = i;
528
529                 /* XXX LOV STACKING: submd should be from the subobj */
530                 req->rq_md->lsm_object_id = loi->loi_id;
531                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
532                 req->rq_md->lsm_stripe_count = 0;
533                 lov_set_add_req(req, set);
534         }
535         if (!set->set_count)
536                 GOTO(out_set, rc = -EIO);
537         *reqset = set;
538         RETURN(rc);
539 out_set:
540         lov_fini_cancel_set(set);
541         RETURN(rc);
542 }
543
544 static int create_done(struct obd_export *exp, struct lov_request_set *set,
545                        struct lov_stripe_md **ea)
546 {
547         struct lov_obd *lov = &exp->exp_obd->u.lov;
548         struct obd_trans_info *oti = set->set_oti;
549         struct obdo *src_oa = set->set_oa;
550         struct list_head *pos;
551         struct lov_request *req;
552         struct obdo *ret_oa = NULL;
553         int attrset = 0, rc = 0;
554         ENTRY;
555
556         LASSERT(set->set_completes);
557
558         if (!set->set_success)
559                 GOTO(cleanup, rc = -EIO);
560
561         if (*ea == NULL && set->set_count != set->set_success) {
562                 set->set_count = set->set_success;
563                 qos_shrink_lsm(set);
564         }
565
566         ret_oa = obdo_alloc();
567         if (ret_oa == NULL)
568                 GOTO(cleanup, rc = -ENOMEM);
569
570         list_for_each (pos, &set->set_list) {
571                 req = list_entry(pos, struct lov_request, rq_link);
572                 if (!req->rq_complete || req->rq_rc)
573                         continue;
574                 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
575                                 set->set_md, req->rq_stripe, &attrset);
576         }
577         if (src_oa->o_valid & OBD_MD_FLSIZE &&
578             ret_oa->o_size != src_oa->o_size) {
579                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
580                        src_oa->o_size, ret_oa->o_size);
581                 LBUG();
582         }
583         ret_oa->o_id = src_oa->o_id;
584         ret_oa->o_gr = src_oa->o_gr;
585         ret_oa->o_valid |= OBD_MD_FLGROUP;
586         memcpy(src_oa, ret_oa, sizeof(*src_oa));
587         obdo_free(ret_oa);
588
589         *ea = set->set_md;
590         GOTO(done, rc = 0);
591
592         EXIT;
593 cleanup:
594         list_for_each (pos, &set->set_list) {
595                 struct lov_tgt_desc *tgt;
596                 int err = 0;
597                 req = list_entry(pos, struct lov_request, rq_link);
598
599                 if (!req->rq_complete || req->rq_rc)
600                         continue;
601
602                 tgt = lov->tgts + req->rq_idx;
603                 if (!lov_tgt_ready(lov, tgt, req->rq_gen)) {
604                         CERROR("Failed to uncreate objid "LPX64" subobj "
605                                LPX64" on OST idx %d: osc inactive.\n",
606                                set->set_oa->o_id, req->rq_oa->o_id,
607                                req->rq_idx);
608                         continue;
609                 }
610
611                 err = obd_destroy(tgt->ltd_exp, req->rq_oa, NULL, oti);
612                 lov_tgt_decref(lov, tgt);
613                 if (err)
614                         CERROR("Failed to uncreate objid "LPX64" subobj "
615                                LPX64" on OST idx %d: rc = %d\n",
616                                set->set_oa->o_id, req->rq_oa->o_id,
617                                req->rq_idx, rc);
618         }
619         if (*ea == NULL)
620                 obd_free_memmd(exp, &set->set_md);
621 done:
622         if (oti && set->set_cookies) {
623                 oti->oti_logcookies = set->set_cookies;
624                 if (!set->set_cookie_sent) {
625                         oti_free_cookies(oti);
626                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
627                 } else {
628                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
629                 }
630         }
631         return rc;
632 }
633
634 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
635 {
636         int rc = 0;
637         ENTRY;
638
639         LASSERT(set->set_exp);
640         if (set == NULL)
641                 RETURN(0);
642         if (set->set_completes) {
643                 rc = create_done(set->set_exp, set, ea);
644                 /* FIXME update qos data here */
645         }
646
647         if (atomic_dec_and_test(&set->set_refcount))
648                 lov_finish_set(set);
649
650         RETURN(rc);
651 }
652
653 int lov_update_create_set(struct lov_request_set *set,
654                           struct lov_request *req, int rc)
655 {
656         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
657         struct obd_trans_info *oti = set->set_oti;
658         struct lov_stripe_md *lsm = set->set_md;
659         struct lov_oinfo *loi;
660         struct lov_tgt_desc *tgt;
661         ENTRY;
662
663         req->rq_stripe = set->set_success;
664         loi = &lsm->lsm_oinfo[req->rq_stripe];
665         tgt = lov->tgts + req->rq_idx;
666
667         if (rc && lov_tgt_ready(lov, tgt, req->rq_gen)) {
668                 lov_tgt_decref(lov, tgt);
669                 CERROR("error creating objid "LPX64" sub-object"
670                        " on OST idx %d/%d: rc = %d\n",
671                        set->set_oa->o_id, req->rq_idx,
672                        lsm->lsm_stripe_count, rc);
673                 if (rc > 0) {
674                         CERROR("obd_create returned invalid err %d\n", rc);
675                         rc = -EIO;
676                 }
677         }
678         lov_update_set(set, req, rc);
679         if (rc)
680                 RETURN(rc);
681
682         if (oti && oti->oti_objid)
683                 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
684
685         loi->loi_id = req->rq_oa->o_id;
686         loi->loi_gr = req->rq_oa->o_gr;
687         loi->loi_ost_idx = req->rq_idx;
688         loi->loi_ost_gen = req->rq_gen;
689         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at "
690                "idx %d gen %d\n",
691                lsm->lsm_object_id, loi->loi_id, loi->loi_id,
692                req->rq_idx, req->rq_gen);
693         loi_init(loi);
694
695         if (set->set_cookies)
696                 ++oti->oti_logcookies;
697         if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
698                 set->set_cookie_sent++;
699
700         RETURN(0);
701 }
702
703 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
704                         struct obdo *src_oa, struct obd_trans_info *oti,
705                         struct lov_request_set **reqset)
706 {
707         struct lov_obd *lov = &exp->exp_obd->u.lov;
708         struct lov_request_set *set;
709         int rc = 0, newea = 0;
710         ENTRY;
711
712         OBD_ALLOC(set, sizeof(*set));
713         if (set == NULL)
714                 RETURN(-ENOMEM);
715         lov_init_set(set);
716
717         set->set_exp = exp;
718         set->set_md = *ea;
719         set->set_oa = src_oa;
720         set->set_oti = oti;
721
722         if (set->set_md == NULL) {
723                 int stripes, stripe_cnt;
724                 stripe_cnt = lov_get_stripecnt(lov, 0);
725
726                 /* If the MDS file was truncated up to some size, stripe over
727                  * enough OSTs to allow the file to be created at that size. */
728                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
729                         stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1;
730                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
731
732                         if (stripes > lov->desc.ld_active_tgt_count)
733                                 GOTO(out_set, rc = -EFBIG);
734                         if (stripes < stripe_cnt)
735                                 stripes = stripe_cnt;
736                 } else {
737                         stripes = stripe_cnt;
738                 }
739
740                 rc = lov_alloc_memmd(&set->set_md, stripes,
741                                      lov->desc.ld_pattern ?
742                                      lov->desc.ld_pattern : LOV_PATTERN_RAID0);
743                 if (rc < 0)
744                         goto out_set;
745                 newea = 1;
746         }
747
748         rc = qos_prep_create(lov, set, newea);
749         if (rc)
750                 goto out_lsm;
751
752         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
753                 oti_alloc_cookies(oti, set->set_count);
754                 if (!oti->oti_logcookies)
755                         goto out_lsm;
756                 set->set_cookies = oti->oti_logcookies;
757         }
758         *reqset = set;
759         RETURN(rc);
760
761 out_lsm:
762         if (*ea == NULL)
763                 obd_free_memmd(exp, &set->set_md);
764 out_set:
765         lov_fini_create_set(set, ea);
766         RETURN(rc);
767 }
768
769 static int common_attr_done(struct lov_request_set *set)
770 {
771         struct list_head *pos;
772         struct lov_request *req;
773         struct obdo *tmp_oa;
774         int rc = 0, attrset = 0;
775         ENTRY;
776
777         if (set->set_oa == NULL)
778                 RETURN(0);
779
780         if (!set->set_success)
781                 RETURN(-EIO);
782
783         tmp_oa = obdo_alloc();
784         if (tmp_oa == NULL)
785                 GOTO(out, rc = -ENOMEM);
786
787         list_for_each (pos, &set->set_list) {
788                 req = list_entry(pos, struct lov_request, rq_link);
789
790                 if (!req->rq_complete || req->rq_rc)
791                         continue;
792                 if (req->rq_oa->o_valid == 0)   /* inactive stripe */
793                         continue;
794                 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
795                                 set->set_md, req->rq_stripe, &attrset);
796         }
797         if (!attrset) {
798                 CERROR("No stripes had valid attrs\n");
799                 rc = -EIO;
800         }
801         tmp_oa->o_id = set->set_oa->o_id;
802         memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
803 out:
804         if (tmp_oa)
805                 obdo_free(tmp_oa);
806         RETURN(rc);
807
808 }
809
810 static int brw_done(struct lov_request_set *set)
811 {
812         struct lov_stripe_md *lsm = set->set_md;
813         struct lov_oinfo     *loi = NULL;
814         struct list_head *pos;
815         struct lov_request *req;
816         ENTRY;
817                                                                                                                              
818         list_for_each (pos, &set->set_list) {
819                 req = list_entry(pos, struct lov_request, rq_link);
820                                                                                                                              
821                 if (!req->rq_complete || req->rq_rc)
822                         continue;
823                                                                                                                              
824                 loi = &lsm->lsm_oinfo[req->rq_stripe];
825                                                                                                                              
826                 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
827                         loi->loi_blocks = req->rq_oa->o_blocks;
828         }
829                                                                                                                              
830         RETURN(0);
831 }
832
833 int lov_fini_brw_set(struct lov_request_set *set)
834 {
835         int rc = 0;
836         ENTRY;
837
838         LASSERT(set->set_exp);
839         if (set == NULL)
840                 RETURN(0);
841         if (set->set_completes) {
842                 rc = brw_done(set);
843                 /* FIXME update qos data here */
844         }
845         if (atomic_dec_and_test(&set->set_refcount))
846                 lov_finish_set(set);
847
848         RETURN(rc);
849 }
850
851 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
852                      struct lov_stripe_md *lsm, obd_count oa_bufs,
853                      struct brw_page *pga, struct obd_trans_info *oti,
854                      struct lov_request_set **reqset)
855 {
856         struct {
857                 obd_count       index;
858                 obd_count       count;
859                 obd_count       off;
860         } *info = NULL;
861         struct lov_request_set *set;
862         struct lov_oinfo *loi = NULL;
863         struct lov_obd *lov = &exp->exp_obd->u.lov;
864         int rc = 0, i, shift;
865         ENTRY;
866
867         OBD_ALLOC(set, sizeof(*set));
868         if (set == NULL)
869                 RETURN(-ENOMEM);
870         lov_init_set(set);
871
872         set->set_exp = exp;
873         set->set_md = lsm;
874         set->set_oa = src_oa;
875         set->set_oti = oti;
876         set->set_oabufs = oa_bufs;
877         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
878         if (!set->set_pga)
879                 GOTO(out, rc = -ENOMEM);
880
881         OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
882         if (!info)
883                 GOTO(out, rc = -ENOMEM);
884
885         /* calculate the page count for each stripe */
886         for (i = 0; i < oa_bufs; i++) {
887                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
888                 info[stripe].count++;
889         }
890
891         /* alloc and initialize lov request */
892         loi = lsm->lsm_oinfo;
893         shift = 0;
894         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
895                 struct lov_request *req;
896                 struct lov_tgt_desc *tgt;
897
898                 if (info[i].count == 0)
899                         continue;
900
901                 tgt = lov->tgts + loi->loi_ost_idx;
902                 if (!lov_tgt_ready(lov, tgt, loi->loi_ost_gen)) {
903                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
904                         GOTO(out, rc = -EIO);
905                 }
906
907                 OBD_ALLOC(req, sizeof(*req));
908                 if (req == NULL) {
909                         lov_tgt_decref(lov, tgt);
910                         GOTO(out, rc = -ENOMEM);
911                 }
912
913                 req->rq_oa = obdo_alloc();
914                 if (req->rq_oa == NULL) {
915                         OBD_FREE(req, sizeof(*req));
916                         lov_tgt_decref(lov, tgt);
917                         GOTO(out, rc = -ENOMEM);
918                 }
919
920                 if (src_oa)
921                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
922                 req->rq_oa->o_id = loi->loi_id;
923                 req->rq_buflen = sizeof(*req->rq_md);
924                 OBD_ALLOC(req->rq_md, req->rq_buflen);
925                 if (req->rq_md == NULL) {
926                         obdo_free(req->rq_oa);
927                         OBD_FREE(req, sizeof(*req));
928                         lov_tgt_decref(lov, tgt);
929                         GOTO(out, rc = -ENOMEM);
930                 }
931
932                 req->rq_idx = loi->loi_ost_idx;
933                 req->rq_gen = loi->loi_ost_gen;
934                 req->rq_stripe = i;
935
936                 /* XXX LOV STACKING */
937                 req->rq_md->lsm_object_id = loi->loi_id;
938                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
939                 req->rq_oabufs = info[i].count;
940                 req->rq_pgaidx = shift;
941                 shift += req->rq_oabufs;
942
943                 /* remember the index for sort brw_page array */
944                 info[i].index = req->rq_pgaidx;
945                 lov_set_add_req(req, set);
946         }
947         if (!set->set_count)
948                 GOTO(out, rc = -EIO);
949
950         /* rotate & sort the brw_page array */
951         for (i = 0; i < oa_bufs; i++) {
952                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
953
954                 shift = info[stripe].index + info[stripe].off;
955                 LASSERT(shift < oa_bufs);
956                 set->set_pga[shift] = pga[i];
957                 lov_stripe_offset(lsm, pga[i].disk_offset, stripe,
958                                   &set->set_pga[shift].disk_offset);
959                 info[stripe].off++;
960         }
961 out:
962         if (info)
963                 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
964
965         if (rc == 0)
966                 *reqset = set;
967         else
968                 lov_fini_brw_set(set);
969
970         RETURN(rc);
971 }
972
973 static int getattr_done(struct lov_request_set *set)
974 {
975         return common_attr_done(set);
976 }
977
978 int lov_fini_getattr_set(struct lov_request_set *set)
979 {
980         int rc = 0;
981         ENTRY;
982
983         LASSERT(set->set_exp);
984         if (set == NULL)
985                 RETURN(0);
986         if (set->set_completes)
987                 rc = getattr_done(set);
988
989         if (atomic_dec_and_test(&set->set_refcount))
990                 lov_finish_set(set);
991
992         RETURN(rc);
993 }
994
995 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
996                          struct lov_stripe_md *lsm,
997                          struct lov_request_set **reqset)
998 {
999         struct lov_request_set *set;
1000         struct lov_oinfo *loi = NULL;
1001         struct lov_obd *lov = &exp->exp_obd->u.lov;
1002         int rc = 0, i;
1003         ENTRY;
1004
1005         OBD_ALLOC(set, sizeof(*set));
1006         if (set == NULL)
1007                 RETURN(-ENOMEM);
1008         lov_init_set(set);
1009
1010         set->set_exp = exp;
1011         set->set_md = lsm;
1012         set->set_oa = src_oa;
1013
1014         loi = lsm->lsm_oinfo;
1015         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1016                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1017                 struct lov_request *req;
1018
1019                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1020                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1021                         continue;
1022                 }
1023
1024                 OBD_ALLOC(req, sizeof(*req));
1025                 if (req == NULL) {
1026                         lov_tgt_decref(lov, tgt);
1027                         GOTO(out_set, rc = -ENOMEM);
1028                 }
1029
1030                 req->rq_stripe = i;
1031                 req->rq_idx = loi->loi_ost_idx;
1032                 req->rq_gen = loi->loi_ost_gen;
1033
1034                 req->rq_oa = obdo_alloc();
1035                 if (req->rq_oa == NULL) {
1036                         OBD_FREE(req, sizeof(*req));
1037                         lov_tgt_decref(lov, tgt);
1038                         GOTO(out_set, rc = -ENOMEM);
1039                 }
1040                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1041                 req->rq_oa->o_id = loi->loi_id;
1042                 lov_set_add_req(req, set);
1043         }
1044         if (!set->set_count)
1045                 GOTO(out_set, rc = -EIO);
1046         *reqset = set;
1047         RETURN(rc);
1048 out_set:
1049         lov_fini_getattr_set(set);
1050         RETURN(rc);
1051 }
1052
1053 int lov_fini_destroy_set(struct lov_request_set *set)
1054 {
1055         ENTRY;
1056
1057         LASSERT(set->set_exp);
1058         if (set == NULL)
1059                 RETURN(0);
1060         if (set->set_completes) {
1061                 /* FIXME update qos data here */
1062         }
1063
1064         if (atomic_dec_and_test(&set->set_refcount))
1065                 lov_finish_set(set);
1066
1067         RETURN(0);
1068 }
1069
1070 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
1071                          struct lov_stripe_md *lsm,
1072                          struct obd_trans_info *oti,
1073                          struct lov_request_set **reqset)
1074 {
1075         struct lov_request_set *set;
1076         struct lov_oinfo *loi = NULL;
1077         struct lov_obd *lov = &exp->exp_obd->u.lov;
1078         int rc = 0, cookie_set = 0, i;
1079         ENTRY;
1080
1081         OBD_ALLOC(set, sizeof(*set));
1082         if (set == NULL)
1083                 RETURN(-ENOMEM);
1084         lov_init_set(set);
1085
1086         set->set_exp = exp;
1087         set->set_md = lsm;
1088         set->set_oa = src_oa;
1089         set->set_oti = oti;
1090         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1091                 set->set_cookies = oti->oti_logcookies;
1092
1093         loi = lsm->lsm_oinfo;
1094         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1095                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1096                 struct lov_request *req;
1097
1098                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1099                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1100                         continue;
1101                 }
1102
1103                 OBD_ALLOC(req, sizeof(*req));
1104                 if (req == NULL) {
1105                         lov_tgt_decref(lov, tgt);
1106                         GOTO(out_set, rc = -ENOMEM);
1107                 }
1108
1109                 req->rq_stripe = i;
1110                 req->rq_idx = loi->loi_ost_idx;
1111                 req->rq_gen = loi->loi_ost_gen;
1112
1113                 req->rq_oa = obdo_alloc();
1114                 if (req->rq_oa == NULL) {
1115                         OBD_FREE(req, sizeof(*req));
1116                         lov_tgt_decref(lov, tgt);
1117                         GOTO(out_set, rc = -ENOMEM);
1118                 }
1119
1120                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1121                 req->rq_oa->o_id = loi->loi_id;
1122
1123                 /* Setup the first request's cookie position */
1124                 if (!cookie_set && set->set_cookies) {
1125                         oti->oti_logcookies = set->set_cookies + i;
1126                         cookie_set = 1;
1127                 }
1128                 lov_set_add_req(req, set);
1129         }
1130         if (!set->set_count)
1131                 GOTO(out_set, rc = -EIO);
1132         *reqset = set;
1133         RETURN(rc);
1134 out_set:
1135         lov_fini_destroy_set(set);
1136         RETURN(rc);
1137 }
1138
1139 static int setattr_done(struct lov_request_set *set)
1140 {
1141         return common_attr_done(set);
1142 }
1143
1144 int lov_fini_setattr_set(struct lov_request_set *set)
1145 {
1146         int rc = 0;
1147         ENTRY;
1148
1149         LASSERT(set->set_exp);
1150         if (set == NULL)
1151                 RETURN(0);
1152         if (set->set_completes) {
1153                 rc = setattr_done(set);
1154                 /* FIXME update qos data here */
1155         }
1156
1157         if (atomic_dec_and_test(&set->set_refcount))
1158                 lov_finish_set(set);
1159         RETURN(rc);
1160 }
1161
1162 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1163                          struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1164                          struct lov_request_set **reqset)
1165 {
1166         struct lov_request_set *set;
1167         struct lov_oinfo *loi = NULL;
1168         struct lov_obd *lov = &exp->exp_obd->u.lov;
1169         int rc = 0, i;
1170         ENTRY;
1171
1172         OBD_ALLOC(set, sizeof(*set));
1173         if (set == NULL)
1174                 RETURN(-ENOMEM);
1175         lov_init_set(set);
1176
1177         set->set_exp = exp;
1178         set->set_md = lsm;
1179         set->set_oa = src_oa;
1180
1181         loi = lsm->lsm_oinfo;
1182         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1183                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1184                 struct lov_request *req;
1185
1186                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1187                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1188                         continue;
1189                 }
1190
1191                 OBD_ALLOC(req, sizeof(*req));
1192                 if (req == NULL) {
1193                         lov_tgt_decref(lov, tgt);
1194                         GOTO(out_set, rc = -ENOMEM);
1195                 }
1196
1197                 req->rq_stripe = i;
1198                 req->rq_idx = loi->loi_ost_idx;
1199                 req->rq_gen = loi->loi_ost_gen;
1200
1201                 req->rq_oa = obdo_alloc();
1202                 if (req->rq_oa == NULL) {
1203                         OBD_FREE(req, sizeof(*req));
1204                         lov_tgt_decref(lov, tgt);
1205                         GOTO(out_set, rc = -ENOMEM);
1206                 }
1207
1208                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1209                 req->rq_oa->o_id = loi->loi_id;
1210                 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
1211
1212                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1213                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
1214                                               &req->rq_oa->o_size) < 0 &&
1215                             req->rq_oa->o_size)
1216                                 req->rq_oa->o_size--;
1217                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1218                                i, req->rq_oa->o_size, src_oa->o_size);
1219                 }
1220                 lov_set_add_req(req, set);
1221         }
1222         if (!set->set_count)
1223                 GOTO(out_set, rc = -EIO);
1224         *reqset = set;
1225         RETURN(rc);
1226 out_set:
1227         lov_fini_setattr_set(set);
1228         RETURN(rc);
1229 }
1230
1231 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1232                          int rc)
1233 {
1234         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1235         ENTRY;
1236
1237         lov_update_set(set, req, rc);
1238         if (rc) {
1239                 struct lov_tgt_desc *tgt = lov->tgts + req->rq_idx;
1240
1241                 if (lov_tgt_active(lov, tgt, req->rq_gen))
1242                         lov_tgt_decref(lov, tgt);
1243                 else
1244                         rc = 0;
1245         }
1246
1247         /* FIXME in raid1 regime, should return 0 */
1248         RETURN(rc);
1249 }
1250
1251 int lov_fini_punch_set(struct lov_request_set *set)
1252 {
1253         int rc = 0;
1254         ENTRY;
1255
1256         LASSERT(set->set_exp);
1257         if (set == NULL)
1258                 RETURN(0);
1259         if (set->set_completes) {
1260                 if (!set->set_success)
1261                         rc = -EIO;
1262                 /* FIXME update qos data here */
1263         }
1264
1265         if (atomic_dec_and_test(&set->set_refcount))
1266                 lov_finish_set(set);
1267
1268         RETURN(rc);
1269 }
1270
1271 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1272                        struct lov_stripe_md *lsm, obd_off start,
1273                        obd_off end, struct obd_trans_info *oti,
1274                        struct lov_request_set **reqset)
1275 {
1276         struct lov_request_set *set;
1277         struct lov_oinfo *loi = NULL;
1278         struct lov_obd *lov = &exp->exp_obd->u.lov;
1279         int rc = 0, i;
1280         ENTRY;
1281
1282         OBD_ALLOC(set, sizeof(*set));
1283         if (set == NULL)
1284                 RETURN(-ENOMEM);
1285         lov_init_set(set);
1286
1287         set->set_exp = exp;
1288         set->set_md = lsm;
1289         set->set_oa = src_oa;
1290
1291         loi = lsm->lsm_oinfo;
1292         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1293                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1294                 struct lov_request *req;
1295                 obd_off rs, re;
1296
1297                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1298                         continue;
1299
1300                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1301                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1302                         continue;
1303                 }
1304
1305                 OBD_ALLOC(req, sizeof(*req));
1306                 if (req == NULL) {
1307                         lov_tgt_decref(lov, tgt);
1308                         GOTO(out_set, rc = -ENOMEM);
1309                 }
1310
1311                 req->rq_stripe = i;
1312                 req->rq_idx = loi->loi_ost_idx;
1313                 req->rq_gen = loi->loi_ost_gen;
1314
1315                 req->rq_oa = obdo_alloc();
1316                 if (req->rq_oa == NULL) {
1317                         OBD_FREE(req, sizeof(*req));
1318                         lov_tgt_decref(lov, tgt);
1319                         GOTO(out_set, rc = -ENOMEM);
1320                 }
1321
1322                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1323                 req->rq_oa->o_id = loi->loi_id;
1324                 req->rq_oa->o_gr = loi->loi_gr;
1325                 req->rq_oa->o_valid |= OBD_MD_FLGROUP;
1326
1327                 req->rq_extent.start = rs;
1328                 req->rq_extent.end = re;
1329                 req->rq_extent.gid = -1;
1330
1331                 lov_set_add_req(req, set);
1332         }
1333         if (!set->set_count)
1334                 GOTO(out_set, rc = -EIO);
1335         *reqset = set;
1336         RETURN(rc);
1337 out_set:
1338         lov_fini_punch_set(set);
1339         RETURN(rc);
1340 }
1341
1342 int lov_fini_sync_set(struct lov_request_set *set)
1343 {
1344         int rc = 0;
1345         ENTRY;
1346
1347         LASSERT(set->set_exp);
1348         if (set == NULL)
1349                 RETURN(0);
1350         if (set->set_completes) {
1351                 if (!set->set_success)
1352                         rc = -EIO;
1353                 /* FIXME update qos data here */
1354         }
1355
1356         if (atomic_dec_and_test(&set->set_refcount))
1357                 lov_finish_set(set);
1358
1359         RETURN(rc);
1360 }
1361
1362 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1363                       struct lov_stripe_md *lsm, obd_off start,
1364                       obd_off end, struct lov_request_set **reqset)
1365 {
1366         struct lov_request_set *set;
1367         struct lov_oinfo *loi = NULL;
1368         struct lov_obd *lov = &exp->exp_obd->u.lov;
1369         int rc = 0, i;
1370         ENTRY;
1371
1372         OBD_ALLOC(set, sizeof(*set));
1373         if (set == NULL)
1374                 RETURN(-ENOMEM);
1375         lov_init_set(set);
1376
1377         set->set_exp = exp;
1378         set->set_md = lsm;
1379         set->set_oa = src_oa;
1380
1381         loi = lsm->lsm_oinfo;
1382         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1383                 struct lov_tgt_desc *tgt = lov->tgts + loi->loi_ost_idx;
1384                 struct lov_request *req;
1385                 obd_off rs, re;
1386
1387                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1388                         continue;
1389
1390                 if (!lov_tgt_active(lov, tgt, loi->loi_ost_gen)) {
1391                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1392                         continue;
1393                 }
1394
1395                 OBD_ALLOC(req, sizeof(*req));
1396                 if (req == NULL) {
1397                         lov_tgt_decref(lov, tgt);
1398                         GOTO(out_set, rc = -ENOMEM);
1399                 }
1400
1401                 req->rq_stripe = i;
1402                 req->rq_idx = loi->loi_ost_idx;
1403                 req->rq_gen = loi->loi_ost_gen;
1404
1405                 req->rq_oa = obdo_alloc();
1406                 if (req->rq_oa == NULL) {
1407                         OBD_FREE(req, sizeof(*req));
1408                         lov_tgt_decref(lov, tgt);
1409                         GOTO(out_set, rc = -ENOMEM);
1410                 }
1411                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1412                 req->rq_oa->o_id = loi->loi_id;
1413                 req->rq_extent.start = rs;
1414                 req->rq_extent.end = re;
1415                 req->rq_extent.gid = -1;
1416
1417                 lov_set_add_req(req, set);
1418         }
1419         if (!set->set_count)
1420                 GOTO(out_set, rc = -EIO);
1421         *reqset = set;
1422         RETURN(rc);
1423 out_set:
1424         lov_fini_sync_set(set);
1425         RETURN(rc);
1426 }