Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62 }
63
64 static void lov_finish_set(struct lov_request_set *set)
65 {
66         struct list_head *pos, *n;
67         ENTRY;
68
69         LASSERT(set);
70         list_for_each_safe(pos, n, &set->set_list) {
71                 struct lov_request *req = list_entry(pos, struct lov_request,
72                                                      rq_link);
73                 list_del_init(&req->rq_link);
74
75                 if (req->rq_oi.oi_oa)
76                         OBDO_FREE(req->rq_oi.oi_oa);
77                 if (req->rq_oi.oi_md)
78                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
79                 if (req->rq_oi.oi_osfs)
80                         OBD_FREE(req->rq_oi.oi_osfs,
81                                  sizeof(*req->rq_oi.oi_osfs));
82                 OBD_FREE(req, sizeof(*req));
83         }
84
85         if (set->set_pga) {
86                 int len = set->set_oabufs * sizeof(*set->set_pga);
87                 OBD_FREE(set->set_pga, len);
88         }
89         if (set->set_lockh)
90                 lov_llh_put(set->set_lockh);
91
92         OBD_FREE(set, sizeof(*set));
93         EXIT;
94 }
95
96 void lov_update_set(struct lov_request_set *set,
97                     struct lov_request *req, int rc)
98 {
99         req->rq_complete = 1;
100         req->rq_rc = rc;
101
102         set->set_completes++;
103         if (rc == 0)
104                 set->set_success++;
105 }
106
107 int lov_update_common_set(struct lov_request_set *set,
108                           struct lov_request *req, int rc)
109 {
110         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
111         ENTRY;
112
113         lov_update_set(set, req, rc);
114
115         /* grace error on inactive ost */
116         if (rc && !(lov->lov_tgts[req->rq_idx] && 
117                     lov->lov_tgts[req->rq_idx]->ltd_active))
118                 rc = 0;
119
120         /* FIXME in raid1 regime, should return 0 */
121         RETURN(rc);
122 }
123
124 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
125 {
126         list_add_tail(&req->rq_link, &set->set_list);
127         set->set_count++;
128 }
129
130 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
131 {
132         struct lov_request_set *set = req->rq_rqset;
133         struct lustre_handle *lov_lockhp;
134         struct lov_oinfo *loi;
135         ENTRY;
136
137         LASSERT(set != NULL);
138         LASSERT(set->set_oi != NULL);
139
140         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
141         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
142
143         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
144          * and that copy can be arbitrarily out of date.
145          *
146          * The LOV API is due for a serious rewriting anyways, and this
147          * can be addressed then. */
148
149         if (rc == ELDLM_OK) {
150                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
151                 __u64 tmp;
152
153                 LASSERT(lock != NULL);
154                 lov_stripe_lock(set->set_oi->oi_md);
155                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
156                 tmp = loi->loi_lvb.lvb_size;
157                 /* Extend KMS up to the end of this lock and no further
158                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
159                 if (tmp > lock->l_policy_data.l_extent.end)
160                         tmp = lock->l_policy_data.l_extent.end + 1;
161                 if (tmp >= loi->loi_kms) {
162                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
163                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
164                         loi->loi_kms = tmp;
165                         loi->loi_kms_valid = 1;
166                 } else {
167                         LDLM_DEBUG(lock, "lock acquired, setting rss="
168                                    LPU64"; leaving kms="LPU64", end="LPU64,
169                                    loi->loi_lvb.lvb_size, loi->loi_kms,
170                                    lock->l_policy_data.l_extent.end);
171                 }
172                 lov_stripe_unlock(set->set_oi->oi_md);
173                 ldlm_lock_allow_match(lock);
174                 LDLM_LOCK_PUT(lock);
175         } else if ((rc == ELDLM_LOCK_ABORTED) &&
176                    (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
177                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
178                 lov_stripe_lock(set->set_oi->oi_md);
179                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
180                 lov_stripe_unlock(set->set_oi->oi_md);
181                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
182                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
183                 rc = ELDLM_OK;
184         } else {
185                 struct obd_export *exp = set->set_exp;
186                 struct lov_obd *lov = &exp->exp_obd->u.lov;
187
188                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
189                 if (lov->lov_tgts[req->rq_idx] && 
190                     lov->lov_tgts[req->rq_idx]->ltd_active) {
191                         /* -EUSERS used by OST to report file contention */
192                         if (rc != -EINTR && rc != -EUSERS)
193                                 CERROR("enqueue objid "LPX64" subobj "
194                                        LPX64" on OST idx %d: rc %d\n",
195                                        set->set_oi->oi_md->lsm_object_id,
196                                        loi->loi_id, loi->loi_ost_idx, rc);
197                 } else {
198                         rc = ELDLM_OK;
199                 }
200         }
201         lov_update_set(set, req, rc);
202         RETURN(rc);
203 }
204
205 /* The callback for osc_enqueue that updates lov info for every OSC request. */
206 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
207 {
208         struct ldlm_enqueue_info *einfo;
209         struct lov_request *lovreq;
210
211         lovreq = container_of(oinfo, struct lov_request, rq_oi);
212         einfo = lovreq->rq_rqset->set_ei;
213         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
214 }
215
216 static int enqueue_done(struct lov_request_set *set, __u32 mode)
217 {
218         struct lov_request *req;
219         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
220         int rc = 0;
221         ENTRY;
222
223         /* enqueue/match success, just return */
224         if (set->set_completes && set->set_completes == set->set_success)
225                 RETURN(0);
226
227         /* cancel enqueued/matched locks */
228         list_for_each_entry(req, &set->set_list, rq_link) {
229                 struct lustre_handle *lov_lockhp;
230
231                 if (!req->rq_complete || req->rq_rc)
232                         continue;
233
234                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
235                 LASSERT(lov_lockhp);
236                 if (!lustre_handle_is_used(lov_lockhp))
237                         continue;
238
239                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
240                                 req->rq_oi.oi_md, mode, lov_lockhp);
241                 if (rc && lov->lov_tgts[req->rq_idx] &&
242                     lov->lov_tgts[req->rq_idx]->ltd_active)
243                         CERROR("cancelling obdjid "LPX64" on OST "
244                                "idx %d error: rc = %d\n",
245                                req->rq_oi.oi_md->lsm_object_id,
246                                req->rq_idx, rc);
247         }
248         if (set->set_lockh)
249                 lov_llh_put(set->set_lockh);
250         RETURN(rc);
251 }
252
253 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
254                          struct ptlrpc_request_set *rqset)
255 {
256         int ret = 0;
257         ENTRY;
258
259         if (set == NULL)
260                 RETURN(0);
261         LASSERT(set->set_exp);
262         /* Do enqueue_done only for sync requests and if any request
263          * succeeded. */
264         if (!rqset) {
265                 if (rc)
266                         set->set_completes = 0;
267                 ret = enqueue_done(set, mode);
268         } else if (set->set_lockh)
269                 lov_llh_put(set->set_lockh);
270
271         if (atomic_dec_and_test(&set->set_refcount))
272                 lov_finish_set(set);
273
274         RETURN(rc ? rc : ret);
275 }
276
277 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
278                          struct ldlm_enqueue_info *einfo,
279                          struct lov_request_set **reqset)
280 {
281         struct lov_obd *lov = &exp->exp_obd->u.lov;
282         struct lov_request_set *set;
283         int i, rc = 0;
284         ENTRY;
285
286         OBD_ALLOC(set, sizeof(*set));
287         if (set == NULL)
288                 RETURN(-ENOMEM);
289         lov_init_set(set);
290
291         set->set_exp = exp;
292         set->set_oi = oinfo;
293         set->set_ei = einfo;
294         set->set_lockh = lov_llh_new(oinfo->oi_md);
295         if (set->set_lockh == NULL)
296                 GOTO(out_set, rc = -ENOMEM);
297         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
298
299         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
300                 struct lov_oinfo *loi;
301                 struct lov_request *req;
302                 obd_off start, end;
303
304                 loi = oinfo->oi_md->lsm_oinfo[i];
305                 if (!lov_stripe_intersects(oinfo->oi_md, i,
306                                            oinfo->oi_policy.l_extent.start,
307                                            oinfo->oi_policy.l_extent.end,
308                                            &start, &end))
309                         continue;
310
311                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
312                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
313                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
314                         continue;
315                 }
316
317                 OBD_ALLOC(req, sizeof(*req));
318                 if (req == NULL)
319                         GOTO(out_set, rc = -ENOMEM);
320
321                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
322                         sizeof(struct lov_oinfo *) +
323                         sizeof(struct lov_oinfo);
324                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
325                 if (req->rq_oi.oi_md == NULL) {
326                         OBD_FREE(req, sizeof(*req));
327                         GOTO(out_set, rc = -ENOMEM);
328                 }
329                 req->rq_oi.oi_md->lsm_oinfo[0] =
330                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
331                         sizeof(struct lov_oinfo *);
332
333
334                 req->rq_rqset = set;
335                 /* Set lov request specific parameters. */
336                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
337                 req->rq_oi.oi_cb_up = cb_update_enqueue;
338                 req->rq_oi.oi_flags = oinfo->oi_flags;
339
340                 LASSERT(req->rq_oi.oi_lockh);
341
342                 req->rq_oi.oi_policy.l_extent.gid =
343                         oinfo->oi_policy.l_extent.gid;
344                 req->rq_oi.oi_policy.l_extent.start = start;
345                 req->rq_oi.oi_policy.l_extent.end = end;
346
347                 req->rq_idx = loi->loi_ost_idx;
348                 req->rq_stripe = i;
349
350                 /* XXX LOV STACKING: submd should be from the subobj */
351                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
352                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
353                 req->rq_oi.oi_md->lsm_stripe_count = 0;
354                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
355                         loi->loi_kms_valid;
356                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
357                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
358
359                 lov_set_add_req(req, set);
360         }
361         if (!set->set_count)
362                 GOTO(out_set, rc = -EIO);
363         *reqset = set;
364         RETURN(0);
365 out_set:
366         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
367         RETURN(rc);
368 }
369
370 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
371                          int rc)
372 {
373         int ret = rc;
374         ENTRY;
375
376         if (rc > 0)
377                 ret = 0;
378         else if (rc == 0)
379                 ret = 1;
380         lov_update_set(set, req, ret);
381         RETURN(rc);
382 }
383
384 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
385 {
386         int rc = 0;
387         ENTRY;
388
389         if (set == NULL)
390                 RETURN(0);
391         LASSERT(set->set_exp);
392         rc = enqueue_done(set, mode);
393         if ((set->set_count == set->set_success) &&
394             (flags & LDLM_FL_TEST_LOCK))
395                 lov_llh_put(set->set_lockh);
396
397         if (atomic_dec_and_test(&set->set_refcount))
398                 lov_finish_set(set);
399
400         RETURN(rc);
401 }
402
403 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
404                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
405                        __u32 mode, struct lustre_handle *lockh,
406                        struct lov_request_set **reqset)
407 {
408         struct lov_obd *lov = &exp->exp_obd->u.lov;
409         struct lov_request_set *set;
410         int i, rc = 0;
411         ENTRY;
412
413         OBD_ALLOC(set, sizeof(*set));
414         if (set == NULL)
415                 RETURN(-ENOMEM);
416         lov_init_set(set);
417
418         set->set_exp = exp;
419         set->set_oi = oinfo;
420         set->set_oi->oi_md = lsm;
421         set->set_lockh = lov_llh_new(lsm);
422         if (set->set_lockh == NULL)
423                 GOTO(out_set, rc = -ENOMEM);
424         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
425
426         for (i = 0; i < lsm->lsm_stripe_count; i++){
427                 struct lov_oinfo *loi;
428                 struct lov_request *req;
429                 obd_off start, end;
430
431                 loi = lsm->lsm_oinfo[i];
432                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
433                                            policy->l_extent.end, &start, &end))
434                         continue;
435
436                 /* FIXME raid1 should grace this error */
437                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
438                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
439                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
440                         GOTO(out_set, rc = -EIO);
441                 }
442
443                 OBD_ALLOC(req, sizeof(*req));
444                 if (req == NULL)
445                         GOTO(out_set, rc = -ENOMEM);
446
447                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
448                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
449                 if (req->rq_oi.oi_md == NULL) {
450                         OBD_FREE(req, sizeof(*req));
451                         GOTO(out_set, rc = -ENOMEM);
452                 }
453
454                 req->rq_oi.oi_policy.l_extent.start = start;
455                 req->rq_oi.oi_policy.l_extent.end = end;
456                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
457
458                 req->rq_idx = loi->loi_ost_idx;
459                 req->rq_stripe = i;
460
461                 /* XXX LOV STACKING: submd should be from the subobj */
462                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
463                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
464                 req->rq_oi.oi_md->lsm_stripe_count = 0;
465
466                 lov_set_add_req(req, set);
467         }
468         if (!set->set_count)
469                 GOTO(out_set, rc = -EIO);
470         *reqset = set;
471         RETURN(rc);
472 out_set:
473         lov_fini_match_set(set, mode, 0);
474         RETURN(rc);
475 }
476
477 int lov_fini_cancel_set(struct lov_request_set *set)
478 {
479         int rc = 0;
480         ENTRY;
481
482         if (set == NULL)
483                 RETURN(0);
484
485         LASSERT(set->set_exp);
486         if (set->set_lockh)
487                 lov_llh_put(set->set_lockh);
488
489         if (atomic_dec_and_test(&set->set_refcount))
490                 lov_finish_set(set);
491
492         RETURN(rc);
493 }
494
495 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
496                         struct lov_stripe_md *lsm, __u32 mode,
497                         struct lustre_handle *lockh,
498                         struct lov_request_set **reqset)
499 {
500         struct lov_request_set *set;
501         int i, rc = 0;
502         ENTRY;
503
504         OBD_ALLOC(set, sizeof(*set));
505         if (set == NULL)
506                 RETURN(-ENOMEM);
507         lov_init_set(set);
508
509         set->set_exp = exp;
510         set->set_oi = oinfo;
511         set->set_oi->oi_md = lsm;
512         set->set_lockh = lov_handle2llh(lockh);
513         if (set->set_lockh == NULL) {
514                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
515                 GOTO(out_set, rc = -EINVAL);
516         }
517         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
518
519         for (i = 0; i < lsm->lsm_stripe_count; i++){
520                 struct lov_request *req;
521                 struct lustre_handle *lov_lockhp;
522                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
523
524                 lov_lockhp = set->set_lockh->llh_handles + i;
525                 if (!lustre_handle_is_used(lov_lockhp)) {
526                         CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
527                                loi->loi_ost_idx, loi->loi_id);
528                         continue;
529                 }
530
531                 OBD_ALLOC(req, sizeof(*req));
532                 if (req == NULL)
533                         GOTO(out_set, rc = -ENOMEM);
534
535                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
536                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
537                 if (req->rq_oi.oi_md == NULL) {
538                         OBD_FREE(req, sizeof(*req));
539                         GOTO(out_set, rc = -ENOMEM);
540                 }
541
542                 req->rq_idx = loi->loi_ost_idx;
543                 req->rq_stripe = i;
544
545                 /* XXX LOV STACKING: submd should be from the subobj */
546                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
547                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
548                 req->rq_oi.oi_md->lsm_stripe_count = 0;
549
550                 lov_set_add_req(req, set);
551         }
552         if (!set->set_count)
553                 GOTO(out_set, rc = -EIO);
554         *reqset = set;
555         RETURN(rc);
556 out_set:
557         lov_fini_cancel_set(set);
558         RETURN(rc);
559 }
560
561 static int create_done(struct obd_export *exp, struct lov_request_set *set,
562                        struct lov_stripe_md **lsmp)
563 {
564         struct lov_obd *lov = &exp->exp_obd->u.lov;
565         struct obd_trans_info *oti = set->set_oti;
566         struct obdo *src_oa = set->set_oi->oi_oa;
567         struct lov_request *req;
568         struct obdo *ret_oa = NULL;
569         int attrset = 0, rc = 0;
570         ENTRY;
571
572         LASSERT(set->set_completes);
573
574         /* try alloc objects on other osts if osc_create fails for
575          * exceptions: RPC failure, ENOSPC, etc */
576         if (set->set_count != set->set_success) {
577                 list_for_each_entry (req, &set->set_list, rq_link) {
578                         if (req->rq_rc == 0)
579                                 continue;
580
581                         set->set_completes--;
582                         req->rq_complete = 0;
583
584                         rc = qos_remedy_create(set, req);
585                         lov_update_create_set(set, req, rc);
586
587                         if (rc)
588                                 break;
589                 }
590         }
591
592         /* no successful creates */
593         if (set->set_success == 0)
594                 GOTO(cleanup, rc);
595
596         /* If there was an explicit stripe set, fail.  Otherwise, we
597          * got some objects and that's not bad. */
598         if (set->set_count != set->set_success) {
599                 if (*lsmp)
600                         GOTO(cleanup, rc);
601                 set->set_count = set->set_success;
602                 qos_shrink_lsm(set);
603         }
604
605         OBDO_ALLOC(ret_oa);
606         if (ret_oa == NULL)
607                 GOTO(cleanup, rc = -ENOMEM);
608
609         list_for_each_entry(req, &set->set_list, rq_link) {
610                 if (!req->rq_complete || req->rq_rc)
611                         continue;
612                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
613                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
614                                 req->rq_stripe, &attrset);
615         }
616         if (src_oa->o_valid & OBD_MD_FLSIZE &&
617             ret_oa->o_size != src_oa->o_size) {
618                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
619                        src_oa->o_size, ret_oa->o_size);
620                 LBUG();
621         }
622         ret_oa->o_id = src_oa->o_id;
623         ret_oa->o_gr = src_oa->o_gr;
624         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
625         memcpy(src_oa, ret_oa, sizeof(*src_oa));
626         OBDO_FREE(ret_oa);
627
628         *lsmp = set->set_oi->oi_md;
629         GOTO(done, rc = 0);
630
631 cleanup:
632         list_for_each_entry(req, &set->set_list, rq_link) {
633                 struct obd_export *sub_exp;
634                 int err = 0;
635
636                 if (!req->rq_complete || req->rq_rc)
637                         continue;
638
639                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
640                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
641                 if (err)
642                         CERROR("Failed to uncreate objid "LPX64" subobj "
643                                LPX64" on OST idx %d: rc = %d\n",
644                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
645                                req->rq_idx, rc);
646         }
647         if (*lsmp == NULL)
648                 obd_free_memmd(exp, &set->set_oi->oi_md);
649 done:
650         if (oti && set->set_cookies) {
651                 oti->oti_logcookies = set->set_cookies;
652                 if (!set->set_cookie_sent) {
653                         oti_free_cookies(oti);
654                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
655                 } else {
656                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
657                 }
658         }
659         RETURN(rc);
660 }
661
662 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
663 {
664         int rc = 0;
665         ENTRY;
666
667         if (set == NULL)
668                 RETURN(0);
669         LASSERT(set->set_exp);
670         if (set->set_completes)
671                 rc = create_done(set->set_exp, set, lsmp);
672
673         if (atomic_dec_and_test(&set->set_refcount))
674                 lov_finish_set(set);
675
676         RETURN(rc);
677 }
678
679 int lov_update_create_set(struct lov_request_set *set,
680                           struct lov_request *req, int rc)
681 {
682         struct obd_trans_info *oti = set->set_oti;
683         struct lov_stripe_md *lsm = set->set_oi->oi_md;
684         struct lov_oinfo *loi;
685         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
686         ENTRY;
687
688         req->rq_stripe = set->set_success;
689         loi = lsm->lsm_oinfo[req->rq_stripe];
690
691         if (rc && lov->lov_tgts[req->rq_idx] &&
692             lov->lov_tgts[req->rq_idx]->ltd_active) {
693                 CERROR("error creating fid "LPX64" sub-object"
694                        " on OST idx %d/%d: rc = %d\n",
695                        set->set_oi->oi_oa->o_id, req->rq_idx,
696                        lsm->lsm_stripe_count, rc);
697                 if (rc > 0) {
698                         CERROR("obd_create returned invalid err %d\n", rc);
699                         rc = -EIO;
700                 }
701         }
702         lov_update_set(set, req, rc);
703         if (rc)
704                 RETURN(rc);
705
706         loi->loi_id = req->rq_oi.oi_oa->o_id;
707         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
708         loi->loi_ost_idx = req->rq_idx;
709         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
710                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
711         loi_init(loi);
712
713         if (oti && set->set_cookies)
714                 ++oti->oti_logcookies;
715         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
716                 set->set_cookie_sent++;
717
718         RETURN(0);
719 }
720
721 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
722                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
723                         struct obd_trans_info *oti,
724                         struct lov_request_set **reqset)
725 {
726         struct lov_request_set *set;
727         int rc = 0;
728         ENTRY;
729
730         OBD_ALLOC(set, sizeof(*set));
731         if (set == NULL)
732                 RETURN(-ENOMEM);
733         lov_init_set(set);
734
735         set->set_exp = exp;
736         set->set_oi = oinfo;
737         set->set_oi->oi_md = *lsmp;
738         set->set_oi->oi_oa = src_oa;
739         set->set_oti = oti;
740
741         rc = qos_prep_create(exp, set);
742         if (rc)
743                 lov_fini_create_set(set, lsmp);
744         else
745                 *reqset = set;
746         RETURN(rc);
747 }
748
749 static int common_attr_done(struct lov_request_set *set)
750 {
751         struct list_head *pos;
752         struct lov_request *req;
753         struct obdo *tmp_oa;
754         int rc = 0, attrset = 0;
755         ENTRY;
756
757         LASSERT(set->set_oi != NULL);
758
759         if (set->set_oi->oi_oa == NULL)
760                 RETURN(0);
761
762         if (!set->set_success)
763                 RETURN(-EIO);
764
765         OBDO_ALLOC(tmp_oa);
766         if (tmp_oa == NULL)
767                 GOTO(out, rc = -ENOMEM);
768
769         list_for_each (pos, &set->set_list) {
770                 req = list_entry(pos, struct lov_request, rq_link);
771
772                 if (!req->rq_complete || req->rq_rc)
773                         continue;
774                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
775                         continue;
776                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
777                                 req->rq_oi.oi_oa->o_valid,
778                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
779         }
780         if (!attrset) {
781                 CERROR("No stripes had valid attrs\n");
782                 rc = -EIO;
783         }
784         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
785         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
786 out:
787         if (tmp_oa)
788                 OBDO_FREE(tmp_oa);
789         RETURN(rc);
790
791 }
792
793 static int brw_done(struct lov_request_set *set)
794 {
795         struct lov_stripe_md *lsm = set->set_oi->oi_md;
796         struct lov_oinfo     *loi = NULL;
797         struct list_head *pos;
798         struct lov_request *req;
799         ENTRY;
800
801         list_for_each (pos, &set->set_list) {
802                 req = list_entry(pos, struct lov_request, rq_link);
803
804                 if (!req->rq_complete || req->rq_rc)
805                         continue;
806
807                 loi = lsm->lsm_oinfo[req->rq_stripe];
808
809                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
810                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
811         }
812
813         RETURN(0);
814 }
815
816 int lov_fini_brw_set(struct lov_request_set *set)
817 {
818         int rc = 0;
819         ENTRY;
820
821         if (set == NULL)
822                 RETURN(0);
823         LASSERT(set->set_exp);
824         if (set->set_completes) {
825                 rc = brw_done(set);
826                 /* FIXME update qos data here */
827         }
828         if (atomic_dec_and_test(&set->set_refcount))
829                 lov_finish_set(set);
830
831         RETURN(rc);
832 }
833
834 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
835                      obd_count oa_bufs, struct brw_page *pga,
836                      struct obd_trans_info *oti,
837                      struct lov_request_set **reqset)
838 {
839         struct {
840                 obd_count       index;
841                 obd_count       count;
842                 obd_count       off;
843         } *info = NULL;
844         struct lov_request_set *set;
845         struct lov_obd *lov = &exp->exp_obd->u.lov;
846         int rc = 0, i, shift;
847         ENTRY;
848
849         OBD_ALLOC(set, sizeof(*set));
850         if (set == NULL)
851                 RETURN(-ENOMEM);
852         lov_init_set(set);
853
854         set->set_exp = exp;
855         set->set_oti = oti;
856         set->set_oi = oinfo;
857         set->set_oabufs = oa_bufs;
858         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
859         if (!set->set_pga)
860                 GOTO(out, rc = -ENOMEM);
861
862         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
863         if (!info)
864                 GOTO(out, rc = -ENOMEM);
865
866         /* calculate the page count for each stripe */
867         for (i = 0; i < oa_bufs; i++) {
868                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
869                 info[stripe].count++;
870         }
871
872         /* alloc and initialize lov request */
873         shift = 0;
874         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
875                 struct lov_oinfo *loi = NULL;
876                 struct lov_request *req;
877
878                 if (info[i].count == 0)
879                         continue;
880                 
881                 loi = oinfo->oi_md->lsm_oinfo[i];
882                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
883                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
884                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
885                         GOTO(out, rc = -EIO);
886                 }
887
888                 OBD_ALLOC(req, sizeof(*req));
889                 if (req == NULL)
890                         GOTO(out, rc = -ENOMEM);
891
892                 OBDO_ALLOC(req->rq_oi.oi_oa);
893                 if (req->rq_oi.oi_oa == NULL) {
894                         OBD_FREE(req, sizeof(*req));
895                         GOTO(out, rc = -ENOMEM);
896                 }
897
898                 if (oinfo->oi_oa) {
899                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
900                                sizeof(*req->rq_oi.oi_oa));
901                 }
902                 req->rq_oi.oi_oa->o_id = loi->loi_id;
903                 req->rq_oi.oi_oa->o_stripe_idx = i;
904
905                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
906                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
907                 if (req->rq_oi.oi_md == NULL) {
908                         OBDO_FREE(req->rq_oi.oi_oa);
909                         OBD_FREE(req, sizeof(*req));
910                         GOTO(out, rc = -ENOMEM);
911                 }
912
913                 req->rq_idx = loi->loi_ost_idx;
914                 req->rq_stripe = i;
915
916                 /* XXX LOV STACKING */
917                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
918                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
919                 req->rq_oabufs = info[i].count;
920                 req->rq_pgaidx = shift;
921                 shift += req->rq_oabufs;
922
923                 /* remember the index for sort brw_page array */
924                 info[i].index = req->rq_pgaidx;
925
926                 req->rq_oi.oi_capa = oinfo->oi_capa;
927
928                 lov_set_add_req(req, set);
929         }
930         if (!set->set_count)
931                 GOTO(out, rc = -EIO);
932
933         /* rotate & sort the brw_page array */
934         for (i = 0; i < oa_bufs; i++) {
935                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
936
937                 shift = info[stripe].index + info[stripe].off;
938                 LASSERT(shift < oa_bufs);
939                 set->set_pga[shift] = pga[i];
940                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
941                                   &set->set_pga[shift].off);
942                 info[stripe].off++;
943         }
944 out:
945         if (info)
946                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
947
948         if (rc == 0)
949                 *reqset = set;
950         else
951                 lov_fini_brw_set(set);
952
953         RETURN(rc);
954 }
955
956 int lov_fini_getattr_set(struct lov_request_set *set)
957 {
958         int rc = 0;
959         ENTRY;
960
961         if (set == NULL)
962                 RETURN(0);
963         LASSERT(set->set_exp);
964         if (set->set_completes)
965                 rc = common_attr_done(set);
966
967         if (atomic_dec_and_test(&set->set_refcount))
968                 lov_finish_set(set);
969
970         RETURN(rc);
971 }
972
973 /* The callback for osc_getattr_async that finilizes a request info when a
974  * response is recieved. */
975 static int cb_getattr_update(struct obd_info *oinfo, int rc)
976 {
977         struct lov_request *lovreq;
978         lovreq = container_of(oinfo, struct lov_request, rq_oi);
979         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
980 }
981
982 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
983                          struct lov_request_set **reqset)
984 {
985         struct lov_request_set *set;
986         struct lov_obd *lov = &exp->exp_obd->u.lov;
987         int rc = 0, i;
988         ENTRY;
989
990         OBD_ALLOC(set, sizeof(*set));
991         if (set == NULL)
992                 RETURN(-ENOMEM);
993         lov_init_set(set);
994
995         set->set_exp = exp;
996         set->set_oi = oinfo;
997
998         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
999                 struct lov_oinfo *loi;
1000                 struct lov_request *req;
1001
1002                 loi = oinfo->oi_md->lsm_oinfo[i];
1003                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1004                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1005                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1006                         continue;
1007                 }
1008
1009                 OBD_ALLOC(req, sizeof(*req));
1010                 if (req == NULL)
1011                         GOTO(out_set, rc = -ENOMEM);
1012
1013                 req->rq_stripe = i;
1014                 req->rq_idx = loi->loi_ost_idx;
1015
1016                 OBDO_ALLOC(req->rq_oi.oi_oa);
1017                 if (req->rq_oi.oi_oa == NULL) {
1018                         OBD_FREE(req, sizeof(*req));
1019                         GOTO(out_set, rc = -ENOMEM);
1020                 }
1021                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1022                        sizeof(*req->rq_oi.oi_oa));
1023                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1024                 req->rq_oi.oi_cb_up = cb_getattr_update;
1025                 req->rq_oi.oi_capa = oinfo->oi_capa;
1026                 req->rq_rqset = set;
1027
1028                 lov_set_add_req(req, set);
1029         }
1030         if (!set->set_count)
1031                 GOTO(out_set, rc = -EIO);
1032         *reqset = set;
1033         RETURN(rc);
1034 out_set:
1035         lov_fini_getattr_set(set);
1036         RETURN(rc);
1037 }
1038
1039 int lov_fini_destroy_set(struct lov_request_set *set)
1040 {
1041         ENTRY;
1042
1043         if (set == NULL)
1044                 RETURN(0);
1045         LASSERT(set->set_exp);
1046         if (set->set_completes) {
1047                 /* FIXME update qos data here */
1048         }
1049
1050         if (atomic_dec_and_test(&set->set_refcount))
1051                 lov_finish_set(set);
1052
1053         RETURN(0);
1054 }
1055
1056 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1057                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1058                          struct obd_trans_info *oti,
1059                          struct lov_request_set **reqset)
1060 {
1061         struct lov_request_set *set;
1062         struct lov_obd *lov = &exp->exp_obd->u.lov;
1063         int rc = 0, i;
1064         ENTRY;
1065
1066         OBD_ALLOC(set, sizeof(*set));
1067         if (set == NULL)
1068                 RETURN(-ENOMEM);
1069         lov_init_set(set);
1070
1071         set->set_exp = exp;
1072         set->set_oi = oinfo;
1073         set->set_oi->oi_md = lsm;
1074         set->set_oi->oi_oa = src_oa;
1075         set->set_oti = oti;
1076         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1077                 set->set_cookies = oti->oti_logcookies;
1078
1079         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1080                 struct lov_oinfo *loi;
1081                 struct lov_request *req;
1082
1083                 loi = lsm->lsm_oinfo[i];
1084                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1085                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1086                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1087                         continue;
1088                 }
1089
1090                 OBD_ALLOC(req, sizeof(*req));
1091                 if (req == NULL)
1092                         GOTO(out_set, rc = -ENOMEM);
1093
1094                 req->rq_stripe = i;
1095                 req->rq_idx = loi->loi_ost_idx;
1096
1097                 OBDO_ALLOC(req->rq_oi.oi_oa);
1098                 if (req->rq_oi.oi_oa == NULL) {
1099                         OBD_FREE(req, sizeof(*req));
1100                         GOTO(out_set, rc = -ENOMEM);
1101                 }
1102                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1103                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1104                 lov_set_add_req(req, set);
1105         }
1106         if (!set->set_count)
1107                 GOTO(out_set, rc = -EIO);
1108         *reqset = set;
1109         RETURN(rc);
1110 out_set:
1111         lov_fini_destroy_set(set);
1112         RETURN(rc);
1113 }
1114
1115 int lov_fini_setattr_set(struct lov_request_set *set)
1116 {
1117         int rc = 0;
1118         ENTRY;
1119
1120         if (set == NULL)
1121                 RETURN(0);
1122         LASSERT(set->set_exp);
1123         if (set->set_completes) {
1124                 rc = common_attr_done(set);
1125                 /* FIXME update qos data here */
1126         }
1127
1128         if (atomic_dec_and_test(&set->set_refcount))
1129                 lov_finish_set(set);
1130         RETURN(rc);
1131 }
1132
1133 int lov_update_setattr_set(struct lov_request_set *set,
1134                            struct lov_request *req, int rc)
1135 {
1136         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1137         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1138         ENTRY;
1139
1140         lov_update_set(set, req, rc);
1141
1142         /* grace error on inactive ost */
1143         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1144                     lov->lov_tgts[req->rq_idx]->ltd_active))
1145                 rc = 0;
1146
1147         if (rc == 0) {
1148                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1149                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1150                                 req->rq_oi.oi_oa->o_ctime;
1151                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1152                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1153                                 req->rq_oi.oi_oa->o_mtime;
1154                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1155                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1156                                 req->rq_oi.oi_oa->o_atime;
1157         }
1158
1159         RETURN(rc);
1160 }
1161
1162 /* The callback for osc_setattr_async that finilizes a request info when a
1163  * response is recieved. */
1164 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1165 {
1166         struct lov_request *lovreq;
1167         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1168         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1169 }
1170
1171 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1172                          struct obd_trans_info *oti,
1173                          struct lov_request_set **reqset)
1174 {
1175         struct lov_request_set *set;
1176         struct lov_obd *lov = &exp->exp_obd->u.lov;
1177         int rc = 0, i;
1178         ENTRY;
1179
1180         OBD_ALLOC(set, sizeof(*set));
1181         if (set == NULL)
1182                 RETURN(-ENOMEM);
1183         lov_init_set(set);
1184
1185         set->set_exp = exp;
1186         set->set_oti = oti;
1187         set->set_oi = oinfo;
1188         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1189                 set->set_cookies = oti->oti_logcookies;
1190
1191         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1192                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1193                 struct lov_request *req;
1194
1195                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1196                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1197                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1198                         continue;
1199                 }
1200
1201                 OBD_ALLOC(req, sizeof(*req));
1202                 if (req == NULL)
1203                         GOTO(out_set, rc = -ENOMEM);
1204                 req->rq_stripe = i;
1205                 req->rq_idx = loi->loi_ost_idx;
1206
1207                 OBDO_ALLOC(req->rq_oi.oi_oa);
1208                 if (req->rq_oi.oi_oa == NULL) {
1209                         OBD_FREE(req, sizeof(*req));
1210                         GOTO(out_set, rc = -ENOMEM);
1211                 }
1212                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1213                        sizeof(*req->rq_oi.oi_oa));
1214                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1215                 LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) 
1216                                 || req->rq_oi.oi_oa->o_gr>0);
1217                 req->rq_oi.oi_oa->o_stripe_idx = i;
1218                 req->rq_oi.oi_cb_up = cb_setattr_update;
1219                 req->rq_oi.oi_capa = oinfo->oi_capa;
1220                 req->rq_rqset = set;
1221
1222                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1223                         int off = lov_stripe_offset(oinfo->oi_md,
1224                                                     oinfo->oi_oa->o_size, i,
1225                                                     &req->rq_oi.oi_oa->o_size);
1226
1227                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1228                                 req->rq_oi.oi_oa->o_size--;
1229
1230                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1231                                i, req->rq_oi.oi_oa->o_size,
1232                                oinfo->oi_oa->o_size);
1233                 }
1234                 lov_set_add_req(req, set);
1235         }
1236         if (!set->set_count)
1237                 GOTO(out_set, rc = -EIO);
1238         *reqset = set;
1239         RETURN(rc);
1240 out_set:
1241         lov_fini_setattr_set(set);
1242         RETURN(rc);
1243 }
1244
1245 int lov_fini_punch_set(struct lov_request_set *set)
1246 {
1247         int rc = 0;
1248         ENTRY;
1249
1250         if (set == NULL)
1251                 RETURN(0);
1252         LASSERT(set->set_exp);
1253         if (set->set_completes) {
1254                 rc = -EIO;
1255                 /* FIXME update qos data here */
1256                 if (set->set_success)
1257                         rc = common_attr_done(set);
1258         }
1259
1260         if (atomic_dec_and_test(&set->set_refcount))
1261                 lov_finish_set(set);
1262
1263         RETURN(rc);
1264 }
1265
1266 int lov_update_punch_set(struct lov_request_set *set,
1267                          struct lov_request *req, int rc)
1268 {
1269         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1270         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1271         ENTRY;
1272
1273         lov_update_set(set, req, rc);
1274
1275         /* grace error on inactive ost */
1276         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1277                 rc = 0;
1278
1279         if (rc == 0) {
1280                 lov_stripe_lock(lsm);
1281                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1282                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1283                                 req->rq_oi.oi_oa->o_blocks;
1284                 }
1285
1286                 /* Do we need to update lvb_size here? It needn't because
1287                  * it have been done in ll_truncate(). -jay */
1288                 lov_stripe_unlock(lsm);
1289         }
1290
1291         RETURN(rc);
1292 }
1293
1294 /* The callback for osc_punch that finilizes a request info when a response
1295  * is recieved. */
1296 static int cb_update_punch(struct obd_info *oinfo, int rc)
1297 {
1298         struct lov_request *lovreq;
1299         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1300         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1301 }
1302
1303 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1304                        struct obd_trans_info *oti,
1305                        struct lov_request_set **reqset)
1306 {
1307         struct lov_request_set *set;
1308         struct lov_obd *lov = &exp->exp_obd->u.lov;
1309         int rc = 0, i;
1310         ENTRY;
1311
1312         OBD_ALLOC(set, sizeof(*set));
1313         if (set == NULL)
1314                 RETURN(-ENOMEM);
1315         lov_init_set(set);
1316
1317         set->set_oi = oinfo;
1318         set->set_exp = exp;
1319
1320         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1321                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1322                 struct lov_request *req;
1323                 obd_off rs, re;
1324
1325                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1326                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1327                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1328                         continue;
1329                 }
1330
1331                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1332                                            oinfo->oi_policy.l_extent.start,
1333                                            oinfo->oi_policy.l_extent.end,
1334                                            &rs, &re))
1335                         continue;
1336
1337                 OBD_ALLOC(req, sizeof(*req));
1338                 if (req == NULL)
1339                         GOTO(out_set, rc = -ENOMEM);
1340                 req->rq_stripe = i;
1341                 req->rq_idx = loi->loi_ost_idx;
1342
1343                 OBDO_ALLOC(req->rq_oi.oi_oa);
1344                 if (req->rq_oi.oi_oa == NULL) {
1345                         OBD_FREE(req, sizeof(*req));
1346                         GOTO(out_set, rc = -ENOMEM);
1347                 }
1348                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1349                        sizeof(*req->rq_oi.oi_oa));
1350                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1351                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1352                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1353
1354                 req->rq_oi.oi_oa->o_stripe_idx = i;
1355                 req->rq_oi.oi_cb_up = cb_update_punch;
1356                 req->rq_rqset = set;
1357
1358                 req->rq_oi.oi_policy.l_extent.start = rs;
1359                 req->rq_oi.oi_policy.l_extent.end = re;
1360                 req->rq_oi.oi_policy.l_extent.gid = -1;
1361
1362                 req->rq_oi.oi_capa = oinfo->oi_capa;
1363
1364                 lov_set_add_req(req, set);
1365         }
1366         if (!set->set_count)
1367                 GOTO(out_set, rc = -EIO);
1368         *reqset = set;
1369         RETURN(rc);
1370 out_set:
1371         lov_fini_punch_set(set);
1372         RETURN(rc);
1373 }
1374
1375 int lov_fini_sync_set(struct lov_request_set *set)
1376 {
1377         int rc = 0;
1378         ENTRY;
1379
1380         if (set == NULL)
1381                 RETURN(0);
1382         LASSERT(set->set_exp);
1383         if (set->set_completes) {
1384                 if (!set->set_success)
1385                         rc = -EIO;
1386                 /* FIXME update qos data here */
1387         }
1388
1389         if (atomic_dec_and_test(&set->set_refcount))
1390                 lov_finish_set(set);
1391
1392         RETURN(rc);
1393 }
1394
1395 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1396                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1397                       obd_off start, obd_off end,
1398                       struct lov_request_set **reqset)
1399 {
1400         struct lov_request_set *set;
1401         struct lov_obd *lov = &exp->exp_obd->u.lov;
1402         int rc = 0, i;
1403         ENTRY;
1404
1405         OBD_ALLOC(set, sizeof(*set));
1406         if (set == NULL)
1407                 RETURN(-ENOMEM);
1408         lov_init_set(set);
1409
1410         set->set_exp = exp;
1411         set->set_oi = oinfo;
1412         set->set_oi->oi_md = lsm;
1413         set->set_oi->oi_oa = src_oa;
1414
1415         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1416                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1417                 struct lov_request *req;
1418                 obd_off rs, re;
1419
1420                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1421                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1422                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1423                         continue;
1424                 }
1425
1426                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1427                         continue;
1428
1429                 OBD_ALLOC(req, sizeof(*req));
1430                 if (req == NULL)
1431                         GOTO(out_set, rc = -ENOMEM);
1432                 req->rq_stripe = i;
1433                 req->rq_idx = loi->loi_ost_idx;
1434
1435                 OBDO_ALLOC(req->rq_oi.oi_oa);
1436                 if (req->rq_oi.oi_oa == NULL) {
1437                         OBD_FREE(req, sizeof(*req));
1438                         GOTO(out_set, rc = -ENOMEM);
1439                 }
1440                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1441                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1442                 req->rq_oi.oi_oa->o_stripe_idx = i;
1443
1444                 req->rq_oi.oi_policy.l_extent.start = rs;
1445                 req->rq_oi.oi_policy.l_extent.end = re;
1446                 req->rq_oi.oi_policy.l_extent.gid = -1;
1447
1448                 lov_set_add_req(req, set);
1449         }
1450         if (!set->set_count)
1451                 GOTO(out_set, rc = -EIO);
1452         *reqset = set;
1453         RETURN(rc);
1454 out_set:
1455         lov_fini_sync_set(set);
1456         RETURN(rc);
1457 }
1458
1459 #define LOV_U64_MAX ((__u64)~0ULL)
1460 #define LOV_SUM_MAX(tot, add)                                           \
1461         do {                                                            \
1462                 if ((tot) + (add) < (tot))                              \
1463                         (tot) = LOV_U64_MAX;                            \
1464                 else                                                    \
1465                         (tot) += (add);                                 \
1466         } while(0)
1467
1468 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1469 {
1470         ENTRY;
1471
1472         if (success) {
1473                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1474
1475                 if (osfs->os_files != LOV_U64_MAX)
1476                         do_div(osfs->os_files, expected_stripes);
1477                 if (osfs->os_ffree != LOV_U64_MAX)
1478                         do_div(osfs->os_ffree, expected_stripes);
1479
1480                 spin_lock(&obd->obd_osfs_lock);
1481                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1482                 obd->obd_osfs_age = get_jiffies_64();
1483                 spin_unlock(&obd->obd_osfs_lock);
1484                 RETURN(0);
1485         }
1486
1487         RETURN(-EIO);
1488 }
1489
1490 int lov_fini_statfs_set(struct lov_request_set *set)
1491 {
1492         int rc = 0;
1493         ENTRY;
1494
1495         if (set == NULL)
1496                 RETURN(0);
1497
1498         if (set->set_completes) {
1499                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1500                                      set->set_success);
1501         }
1502
1503         if (atomic_dec_and_test(&set->set_refcount))
1504                 lov_finish_set(set);
1505
1506         RETURN(rc);
1507 }
1508
1509 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1510                        struct obd_statfs *lov_sfs, int success)
1511 {
1512         int shift = 0, quit = 0;
1513         __u64 tmp;
1514         spin_lock(&obd->obd_osfs_lock);
1515         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1516         obd->obd_osfs_age = get_jiffies_64();
1517         spin_unlock(&obd->obd_osfs_lock);
1518
1519         if (success == 0) {
1520                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1521         } else {
1522                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1523                         /* assume all block sizes are always powers of 2 */
1524                         /* get the bits difference */
1525                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1526                         for (shift = 0; shift <= 64; ++shift) {
1527                                 if (tmp & 1) {
1528                                         if (quit)
1529                                                 break;
1530                                         else
1531                                                 quit = 1;
1532                                         shift = 0;
1533                                 }
1534                                 tmp >>= 1;
1535                         }
1536                 }
1537
1538                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1539                         osfs->os_bsize = lov_sfs->os_bsize;
1540
1541                         osfs->os_bfree  >>= shift;
1542                         osfs->os_bavail >>= shift;
1543                         osfs->os_blocks >>= shift;
1544                 } else if (shift != 0) {
1545                         lov_sfs->os_bfree  >>= shift;
1546                         lov_sfs->os_bavail >>= shift;
1547                         lov_sfs->os_blocks >>= shift;
1548                 }
1549 #ifdef MIN_DF
1550                 /* Sandia requested that df (and so, statfs) only
1551                    returned minimal available space on
1552                    a single OST, so people would be able to
1553                    write this much data guaranteed. */
1554                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1555                         /* Presumably if new bavail is smaller,
1556                            new bfree is bigger as well */
1557                         osfs->os_bfree = lov_sfs->os_bfree;
1558                         osfs->os_bavail = lov_sfs->os_bavail;
1559                 }
1560 #else
1561                 osfs->os_bfree += lov_sfs->os_bfree;
1562                 osfs->os_bavail += lov_sfs->os_bavail;
1563 #endif
1564                 osfs->os_blocks += lov_sfs->os_blocks;
1565                 /* XXX not sure about this one - depends on policy.
1566                  *   - could be minimum if we always stripe on all OBDs
1567                  *     (but that would be wrong for any other policy,
1568                  *     if one of the OBDs has no more objects left)
1569                  *   - could be sum if we stripe whole objects
1570                  *   - could be average, just to give a nice number
1571                  *
1572                  * To give a "reasonable" (if not wholly accurate)
1573                  * number, we divide the total number of free objects
1574                  * by expected stripe count (watch out for overflow).
1575                  */
1576                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1577                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1578         }
1579 }
1580
1581 /* The callback for osc_statfs_async that finilizes a request info when a
1582  * response is recieved. */
1583 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1584 {
1585         struct lov_request *lovreq;
1586         struct obd_statfs *osfs, *lov_sfs;
1587         struct obd_device *obd;
1588         struct lov_obd *lov;
1589         int success;
1590         ENTRY;
1591
1592         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1593         lov = &lovreq->rq_rqset->set_obd->u.lov;
1594         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1595
1596         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1597         lov_sfs = oinfo->oi_osfs;
1598
1599         success = lovreq->rq_rqset->set_success;
1600
1601         /* XXX: the same is done in lov_update_common_set, however
1602            lovset->set_exp is not initialized. */
1603         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1604         if (rc) {
1605                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1606                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1607                         rc = 0;
1608                 RETURN(rc);
1609         }
1610
1611         lov_update_statfs(obd, osfs, lov_sfs, success);
1612         qos_update(lov);
1613
1614         RETURN(0);
1615 }
1616
1617 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1618                         struct lov_request_set **reqset)
1619 {
1620         struct lov_request_set *set;
1621         struct lov_obd *lov = &obd->u.lov;
1622         int rc = 0, i;
1623         ENTRY;
1624
1625         OBD_ALLOC(set, sizeof(*set));
1626         if (set == NULL)
1627                 RETURN(-ENOMEM);
1628         lov_init_set(set);
1629
1630         set->set_obd = obd;
1631         set->set_oi = oinfo;
1632
1633         /* We only get block data from the OBD */
1634         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1635                 struct lov_request *req;
1636
1637                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1638                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1639                         continue;
1640                 }
1641
1642                 OBD_ALLOC(req, sizeof(*req));
1643                 if (req == NULL)
1644                         GOTO(out_set, rc = -ENOMEM);
1645
1646                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1647                 if (req->rq_oi.oi_osfs == NULL) {
1648                         OBD_FREE(req, sizeof(*req));
1649                         GOTO(out_set, rc = -ENOMEM);
1650                 }
1651
1652                 req->rq_idx = i;
1653                 req->rq_oi.oi_cb_up = cb_statfs_update;
1654                 req->rq_rqset = set;
1655
1656                 lov_set_add_req(req, set);
1657         }
1658         if (!set->set_count)
1659                 GOTO(out_set, rc = -EIO);
1660         *reqset = set;
1661         RETURN(rc);
1662 out_set:
1663         lov_fini_statfs_set(set);
1664         RETURN(rc);
1665 }