Whamcloud - gitweb
corrention patch for the lockless i/o code addesses the following problems:
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         OBDO_FREE(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] &&
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         /* -EUSERS used by OST to report file contention */
180                         if (rc != -EINTR && rc != -EUSERS)
181                                 CERROR("enqueue objid "LPX64" subobj "
182                                        LPX64" on OST idx %d: rc %d\n",
183                                        set->set_oi->oi_md->lsm_object_id,
184                                        loi->loi_id, loi->loi_ost_idx, rc);
185                 } else {
186                         rc = ELDLM_OK;
187                 }
188         }
189         lov_update_set(set, req, rc);
190         RETURN(rc);
191 }
192
193 /* The callback for osc_enqueue that updates lov info for every OSC request. */
194 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
195 {
196         struct ldlm_enqueue_info *einfo;
197         struct lov_request *lovreq;
198
199         lovreq = container_of(oinfo, struct lov_request, rq_oi);
200         einfo = lovreq->rq_rqset->set_ei;
201         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
202 }
203
204 static int enqueue_done(struct lov_request_set *set, __u32 mode)
205 {
206         struct lov_request *req;
207         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
208         int rc = 0;
209         ENTRY;
210
211         /* enqueue/match success, just return */
212         if (set->set_completes && set->set_completes == set->set_success)
213                 RETURN(0);
214
215         /* cancel enqueued/matched locks */
216         list_for_each_entry(req, &set->set_list, rq_link) {
217                 struct lustre_handle *lov_lockhp;
218
219                 if (!req->rq_complete || req->rq_rc)
220                         continue;
221
222                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
223                 LASSERT(lov_lockhp);
224                 if (!lustre_handle_is_used(lov_lockhp))
225                         continue;
226
227                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
228                                 req->rq_oi.oi_md, mode, lov_lockhp);
229                 if (rc && lov->lov_tgts[req->rq_idx] &&
230                     lov->lov_tgts[req->rq_idx]->ltd_active)
231                         CERROR("cancelling obdjid "LPX64" on OST "
232                                "idx %d error: rc = %d\n",
233                                req->rq_oi.oi_md->lsm_object_id,
234                                req->rq_idx, rc);
235         }
236         if (set->set_lockh)
237                 lov_llh_put(set->set_lockh);
238         RETURN(rc);
239 }
240
241 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
242                          struct ptlrpc_request_set *rqset)
243 {
244         int ret = 0;
245         ENTRY;
246
247         if (set == NULL)
248                 RETURN(0);
249         LASSERT(set->set_exp);
250         /* Do enqueue_done only for sync requests and if any request
251          * succeeded. */
252         if (!rqset) {
253                 if (rc)
254                         set->set_completes = 0;
255                 ret = enqueue_done(set, mode);
256         } else if (set->set_lockh)
257                 lov_llh_put(set->set_lockh);
258
259         if (atomic_dec_and_test(&set->set_refcount))
260                 lov_finish_set(set);
261
262         RETURN(rc ? rc : ret);
263 }
264
265 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
266                          struct ldlm_enqueue_info *einfo,
267                          struct lov_request_set **reqset)
268 {
269         struct lov_obd *lov = &exp->exp_obd->u.lov;
270         struct lov_request_set *set;
271         int i, rc = 0;
272         struct lov_oinfo *loi;
273         ENTRY;
274
275         OBD_ALLOC(set, sizeof(*set));
276         if (set == NULL)
277                 RETURN(-ENOMEM);
278         lov_init_set(set);
279
280         set->set_exp = exp;
281         set->set_oi = oinfo;
282         set->set_ei = einfo;
283         set->set_lockh = lov_llh_new(oinfo->oi_md);
284         if (set->set_lockh == NULL)
285                 GOTO(out_set, rc = -ENOMEM);
286         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
287
288         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
289                 struct lov_request *req;
290                 obd_off start, end;
291
292                 loi = oinfo->oi_md->lsm_oinfo[i];
293                 if (!lov_stripe_intersects(oinfo->oi_md, i,
294                                            oinfo->oi_policy.l_extent.start,
295                                            oinfo->oi_policy.l_extent.end,
296                                            &start, &end))
297                         continue;
298
299                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
300                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
301                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
302                         continue;
303                 }
304
305                 OBD_ALLOC(req, sizeof(*req));
306                 if (req == NULL)
307                         GOTO(out_set, rc = -ENOMEM);
308
309                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
310                         sizeof(struct lov_oinfo *) +
311                         sizeof(struct lov_oinfo);
312                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
313                 if (req->rq_oi.oi_md == NULL) {
314                         OBD_FREE(req, sizeof(*req));
315                         GOTO(out_set, rc = -ENOMEM);
316                 }
317                 req->rq_oi.oi_md->lsm_oinfo[0] =
318                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
319                         sizeof(struct lov_oinfo *);
320
321
322                 req->rq_rqset = set;
323                 /* Set lov request specific parameters. */
324                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
325                 req->rq_oi.oi_cb_up = cb_update_enqueue;
326                 req->rq_oi.oi_flags = oinfo->oi_flags;
327
328                 LASSERT(req->rq_oi.oi_lockh);
329
330                 req->rq_oi.oi_policy.l_extent.gid =
331                         oinfo->oi_policy.l_extent.gid;
332                 req->rq_oi.oi_policy.l_extent.start = start;
333                 req->rq_oi.oi_policy.l_extent.end = end;
334
335                 req->rq_idx = loi->loi_ost_idx;
336                 req->rq_stripe = i;
337
338                 /* XXX LOV STACKING: submd should be from the subobj */
339                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
340                 req->rq_oi.oi_md->lsm_stripe_count = 0;
341                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
342                         loi->loi_kms_valid;
343                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
344                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
345
346                 lov_set_add_req(req, set);
347         }
348         if (!set->set_count)
349                 GOTO(out_set, rc = -EIO);
350         *reqset = set;
351         RETURN(0);
352 out_set:
353         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
354         RETURN(rc);
355 }
356
357 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
358                          int rc)
359 {
360         int ret = rc;
361         ENTRY;
362
363         if (rc == 1)
364                 ret = 0;
365         else if (rc == 0)
366                 ret = 1;
367         lov_update_set(set, req, ret);
368         RETURN(rc);
369 }
370
371 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
372 {
373         int rc = 0;
374         ENTRY;
375
376         if (set == NULL)
377                 RETURN(0);
378         LASSERT(set->set_exp);
379         rc = enqueue_done(set, mode);
380         if ((set->set_count == set->set_success) &&
381             (flags & LDLM_FL_TEST_LOCK))
382                 lov_llh_put(set->set_lockh);
383
384         if (atomic_dec_and_test(&set->set_refcount))
385                 lov_finish_set(set);
386
387         RETURN(rc);
388 }
389
390 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
391                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
392                        __u32 mode, struct lustre_handle *lockh,
393                        struct lov_request_set **reqset)
394 {
395         struct lov_obd *lov = &exp->exp_obd->u.lov;
396         struct lov_request_set *set;
397         int i, rc = 0;
398         struct lov_oinfo *loi;
399         ENTRY;
400
401         OBD_ALLOC(set, sizeof(*set));
402         if (set == NULL)
403                 RETURN(-ENOMEM);
404         lov_init_set(set);
405
406         set->set_exp = exp;
407         set->set_oi = oinfo;
408         set->set_oi->oi_md = lsm;
409         set->set_lockh = lov_llh_new(lsm);
410         if (set->set_lockh == NULL)
411                 GOTO(out_set, rc = -ENOMEM);
412         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
413
414         for (i = 0; i < lsm->lsm_stripe_count; i++){
415                 struct lov_request *req;
416                 obd_off start, end;
417
418                 loi = lsm->lsm_oinfo[i];
419                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
420                                            policy->l_extent.end, &start, &end))
421                         continue;
422
423                 /* FIXME raid1 should grace this error */
424                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
425                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
426                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
427                         GOTO(out_set, rc = -EIO);
428                 }
429
430                 OBD_ALLOC(req, sizeof(*req));
431                 if (req == NULL)
432                         GOTO(out_set, rc = -ENOMEM);
433
434                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
435                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
436                 if (req->rq_oi.oi_md == NULL) {
437                         OBD_FREE(req, sizeof(*req));
438                         GOTO(out_set, rc = -ENOMEM);
439                 }
440
441                 req->rq_oi.oi_policy.l_extent.start = start;
442                 req->rq_oi.oi_policy.l_extent.end = end;
443                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
444
445                 req->rq_idx = loi->loi_ost_idx;
446                 req->rq_stripe = i;
447
448                 /* XXX LOV STACKING: submd should be from the subobj */
449                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
450                 req->rq_oi.oi_md->lsm_stripe_count = 0;
451
452                 lov_set_add_req(req, set);
453         }
454         if (!set->set_count)
455                 GOTO(out_set, rc = -EIO);
456         *reqset = set;
457         RETURN(rc);
458 out_set:
459         lov_fini_match_set(set, mode, 0);
460         RETURN(rc);
461 }
462
463 int lov_fini_cancel_set(struct lov_request_set *set)
464 {
465         int rc = 0;
466         ENTRY;
467
468         if (set == NULL)
469                 RETURN(0);
470
471         LASSERT(set->set_exp);
472         if (set->set_lockh)
473                 lov_llh_put(set->set_lockh);
474
475         if (atomic_dec_and_test(&set->set_refcount))
476                 lov_finish_set(set);
477
478         RETURN(rc);
479 }
480
481 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
482                         struct lov_stripe_md *lsm, __u32 mode,
483                         struct lustre_handle *lockh,
484                         struct lov_request_set **reqset)
485 {
486         struct lov_request_set *set;
487         int i, rc = 0;
488         struct lov_oinfo *loi;
489         ENTRY;
490
491         OBD_ALLOC(set, sizeof(*set));
492         if (set == NULL)
493                 RETURN(-ENOMEM);
494         lov_init_set(set);
495
496         set->set_exp = exp;
497         set->set_oi = oinfo;
498         set->set_oi->oi_md = lsm;
499         set->set_lockh = lov_handle2llh(lockh);
500         if (set->set_lockh == NULL) {
501                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
502                 GOTO(out_set, rc = -EINVAL);
503         }
504         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
505
506         for (i = 0; i < lsm->lsm_stripe_count; i++){
507                 struct lov_request *req;
508                 struct lustre_handle *lov_lockhp;
509
510                 loi = lsm->lsm_oinfo[i];
511                 lov_lockhp = set->set_lockh->llh_handles + i;
512                 if (!lustre_handle_is_used(lov_lockhp)) {
513                         CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
514                                loi->loi_ost_idx, loi->loi_id);
515                         continue;
516                 }
517
518                 OBD_ALLOC(req, sizeof(*req));
519                 if (req == NULL)
520                         GOTO(out_set, rc = -ENOMEM);
521
522                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
523                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
524                 if (req->rq_oi.oi_md == NULL) {
525                         OBD_FREE(req, sizeof(*req));
526                         GOTO(out_set, rc = -ENOMEM);
527                 }
528
529                 req->rq_idx = loi->loi_ost_idx;
530                 req->rq_stripe = i;
531
532                 /* XXX LOV STACKING: submd should be from the subobj */
533                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
534                 req->rq_oi.oi_md->lsm_stripe_count = 0;
535
536                 lov_set_add_req(req, set);
537         }
538         if (!set->set_count)
539                 GOTO(out_set, rc = -EIO);
540         *reqset = set;
541         RETURN(rc);
542 out_set:
543         lov_fini_cancel_set(set);
544         RETURN(rc);
545 }
546
547 static int create_done(struct obd_export *exp, struct lov_request_set *set,
548                        struct lov_stripe_md **lsmp)
549 {
550         struct lov_obd *lov = &exp->exp_obd->u.lov;
551         struct obd_trans_info *oti = set->set_oti;
552         struct obdo *src_oa = set->set_oi->oi_oa;
553         struct lov_request *req;
554         struct obdo *ret_oa = NULL;
555         int attrset = 0, rc = 0;
556         ENTRY;
557
558         LASSERT(set->set_completes);
559
560         /* try alloc objects on other osts if osc_create fails for
561          * exceptions: RPC failure, ENOSPC, etc */
562         if (set->set_count != set->set_success) {
563                 list_for_each_entry (req, &set->set_list, rq_link) {
564                         if (req->rq_rc == 0)
565                                 continue;
566
567                         set->set_completes--;
568                         req->rq_complete = 0;
569
570                         rc = qos_remedy_create(set, req);
571                         lov_update_create_set(set, req, rc);
572
573                         if (rc)
574                                 break;
575                 }
576         }
577
578         /* no successful creates */
579         if (set->set_success == 0)
580                 GOTO(cleanup, rc);
581
582         /* If there was an explicit stripe set, fail.  Otherwise, we
583          * got some objects and that's not bad. */
584         if (set->set_count != set->set_success) {
585                 if (*lsmp)
586                         GOTO(cleanup, rc);
587                 set->set_count = set->set_success;
588                 qos_shrink_lsm(set);
589         }
590
591         OBDO_ALLOC(ret_oa);
592         if (ret_oa == NULL)
593                 GOTO(cleanup, rc = -ENOMEM);
594
595         list_for_each_entry(req, &set->set_list, rq_link) {
596                 if (!req->rq_complete || req->rq_rc)
597                         continue;
598                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
599                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
600                                 req->rq_stripe, &attrset);
601         }
602         if (src_oa->o_valid & OBD_MD_FLSIZE &&
603             ret_oa->o_size != src_oa->o_size) {
604                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
605                        src_oa->o_size, ret_oa->o_size);
606                 LBUG();
607         }
608         ret_oa->o_id = src_oa->o_id;
609         memcpy(src_oa, ret_oa, sizeof(*src_oa));
610         OBDO_FREE(ret_oa);
611
612         *lsmp = set->set_oi->oi_md;
613         GOTO(done, rc = 0);
614
615 cleanup:
616         list_for_each_entry(req, &set->set_list, rq_link) {
617                 struct obd_export *sub_exp;
618                 int err = 0;
619
620                 if (!req->rq_complete || req->rq_rc)
621                         continue;
622
623                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
624                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
625                 if (err)
626                         CERROR("Failed to uncreate objid "LPX64" subobj "
627                                LPX64" on OST idx %d: rc = %d\n",
628                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
629                                req->rq_idx, rc);
630         }
631         if (*lsmp == NULL)
632                 obd_free_memmd(exp, &set->set_oi->oi_md);
633 done:
634         if (oti && set->set_cookies) {
635                 oti->oti_logcookies = set->set_cookies;
636                 if (!set->set_cookie_sent) {
637                         oti_free_cookies(oti);
638                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
639                 } else {
640                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
641                 }
642         }
643         RETURN(rc);
644 }
645
646 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
647 {
648         int rc = 0;
649         ENTRY;
650
651         if (set == NULL)
652                 RETURN(0);
653         LASSERT(set->set_exp);
654         if (set->set_completes)
655                 rc = create_done(set->set_exp, set, lsmp);
656
657         if (atomic_dec_and_test(&set->set_refcount))
658                 lov_finish_set(set);
659
660         RETURN(rc);
661 }
662
663 int lov_update_create_set(struct lov_request_set *set,
664                           struct lov_request *req, int rc)
665 {
666         struct obd_trans_info *oti = set->set_oti;
667         struct lov_stripe_md *lsm = set->set_oi->oi_md;
668         struct lov_oinfo *loi;
669         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
670         ENTRY;
671
672         req->rq_stripe = set->set_success;
673         loi = lsm->lsm_oinfo[req->rq_stripe];
674
675         if (rc && lov->lov_tgts[req->rq_idx] &&
676             lov->lov_tgts[req->rq_idx]->ltd_active) {
677                 CERROR("error creating fid "LPX64" sub-object"
678                        " on OST idx %d/%d: rc = %d\n",
679                        set->set_oi->oi_oa->o_id, req->rq_idx,
680                        lsm->lsm_stripe_count, rc);
681                 if (rc > 0) {
682                         CERROR("obd_create returned invalid err %d\n", rc);
683                         rc = -EIO;
684                 }
685         }
686         lov_update_set(set, req, rc);
687         if (rc)
688                 RETURN(rc);
689
690         if (oti && oti->oti_objid)
691                 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
692
693         loi->loi_id = req->rq_oi.oi_oa->o_id;
694         loi->loi_ost_idx = req->rq_idx;
695         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
696                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
697         loi_init(loi);
698
699         if (oti && set->set_cookies)
700                 ++oti->oti_logcookies;
701         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
702                 set->set_cookie_sent++;
703
704         RETURN(0);
705 }
706
707 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
708                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
709                         struct obd_trans_info *oti,
710                         struct lov_request_set **reqset)
711 {
712         struct lov_request_set *set;
713         int rc = 0;
714         ENTRY;
715
716         OBD_ALLOC(set, sizeof(*set));
717         if (set == NULL)
718                 RETURN(-ENOMEM);
719         lov_init_set(set);
720
721         set->set_exp = exp;
722         set->set_oi = oinfo;
723         set->set_oi->oi_md = *lsmp;
724         set->set_oi->oi_oa = src_oa;
725         set->set_oti = oti;
726
727         rc = qos_prep_create(exp, set);
728         if (rc)
729                 lov_fini_create_set(set, lsmp);
730         else
731                 *reqset = set;
732         RETURN(rc);
733 }
734
735 static int common_attr_done(struct lov_request_set *set)
736 {
737         struct list_head *pos;
738         struct lov_request *req;
739         struct obdo *tmp_oa;
740         int rc = 0, attrset = 0;
741         ENTRY;
742
743         LASSERT(set->set_oi != NULL);
744
745         if (set->set_oi->oi_oa == NULL)
746                 RETURN(0);
747
748         if (!set->set_success)
749                 RETURN(-EIO);
750
751         OBDO_ALLOC(tmp_oa);
752         if (tmp_oa == NULL)
753                 GOTO(out, rc = -ENOMEM);
754
755         list_for_each (pos, &set->set_list) {
756                 req = list_entry(pos, struct lov_request, rq_link);
757
758                 if (!req->rq_complete || req->rq_rc)
759                         continue;
760                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
761                         continue;
762                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
763                                 req->rq_oi.oi_oa->o_valid,
764                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
765         }
766         if (!attrset) {
767                 CERROR("No stripes had valid attrs\n");
768                 rc = -EIO;
769         }
770         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
771         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
772 out:
773         if (tmp_oa)
774                 OBDO_FREE(tmp_oa);
775         RETURN(rc);
776
777 }
778
779 static int brw_done(struct lov_request_set *set)
780 {
781         struct lov_stripe_md *lsm = set->set_oi->oi_md;
782         struct lov_oinfo     *loi = NULL;
783         struct list_head *pos;
784         struct lov_request *req;
785         ENTRY;
786
787         list_for_each (pos, &set->set_list) {
788                 req = list_entry(pos, struct lov_request, rq_link);
789
790                 if (!req->rq_complete || req->rq_rc)
791                         continue;
792
793                 loi = lsm->lsm_oinfo[req->rq_stripe];
794
795                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
796                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
797         }
798
799         RETURN(0);
800 }
801
802 int lov_fini_brw_set(struct lov_request_set *set)
803 {
804         int rc = 0;
805         ENTRY;
806
807         if (set == NULL)
808                 RETURN(0);
809         LASSERT(set->set_exp);
810         if (set->set_completes) {
811                 rc = brw_done(set);
812                 /* FIXME update qos data here */
813         }
814         if (atomic_dec_and_test(&set->set_refcount))
815                 lov_finish_set(set);
816
817         RETURN(rc);
818 }
819
820 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
821                      obd_count oa_bufs, struct brw_page *pga,
822                      struct obd_trans_info *oti,
823                      struct lov_request_set **reqset)
824 {
825         struct {
826                 obd_count       index;
827                 obd_count       count;
828                 obd_count       off;
829         } *info = NULL;
830         struct lov_request_set *set;
831         struct lov_oinfo *loi = NULL;
832         struct lov_obd *lov = &exp->exp_obd->u.lov;
833         int rc = 0, i, shift;
834         ENTRY;
835
836         OBD_ALLOC(set, sizeof(*set));
837         if (set == NULL)
838                 RETURN(-ENOMEM);
839         lov_init_set(set);
840
841         set->set_exp = exp;
842         set->set_oti = oti;
843         set->set_oi = oinfo;
844         set->set_oabufs = oa_bufs;
845         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
846         if (!set->set_pga)
847                 GOTO(out, rc = -ENOMEM);
848
849         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
850         if (!info)
851                 GOTO(out, rc = -ENOMEM);
852
853         /* calculate the page count for each stripe */
854         for (i = 0; i < oa_bufs; i++) {
855                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
856                 info[stripe].count++;
857         }
858
859         /* alloc and initialize lov request */
860         shift = 0;
861         for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
862                 struct lov_request *req;
863
864                 if (info[i].count == 0)
865                         continue;
866
867                 loi = oinfo->oi_md->lsm_oinfo[i];
868                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
869                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
870                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
871                         GOTO(out, rc = -EIO);
872                 }
873
874                 OBD_ALLOC(req, sizeof(*req));
875                 if (req == NULL)
876                         GOTO(out, rc = -ENOMEM);
877
878                 OBDO_ALLOC(req->rq_oi.oi_oa);
879                 if (req->rq_oi.oi_oa == NULL) {
880                         OBD_FREE(req, sizeof(*req));
881                         GOTO(out, rc = -ENOMEM);
882                 }
883
884                 if (oinfo->oi_oa) {
885                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
886                                sizeof(*req->rq_oi.oi_oa));
887                 }
888                 req->rq_oi.oi_oa->o_id = loi->loi_id;
889                 req->rq_oi.oi_oa->o_stripe_idx = i;
890
891                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
892                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
893                 if (req->rq_oi.oi_md == NULL) {
894                         OBDO_FREE(req->rq_oi.oi_oa);
895                         OBD_FREE(req, sizeof(*req));
896                         GOTO(out, rc = -ENOMEM);
897                 }
898
899                 req->rq_idx = loi->loi_ost_idx;
900                 req->rq_stripe = i;
901
902                 /* XXX LOV STACKING */
903                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
904                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
905                 req->rq_oabufs = info[i].count;
906                 req->rq_pgaidx = shift;
907                 shift += req->rq_oabufs;
908
909                 /* remember the index for sort brw_page array */
910                 info[i].index = req->rq_pgaidx;
911
912                 lov_set_add_req(req, set);
913         }
914         if (!set->set_count)
915                 GOTO(out, rc = -EIO);
916
917         /* rotate & sort the brw_page array */
918         for (i = 0; i < oa_bufs; i++) {
919                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
920
921                 shift = info[stripe].index + info[stripe].off;
922                 LASSERT(shift < oa_bufs);
923                 set->set_pga[shift] = pga[i];
924                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
925                                   &set->set_pga[shift].off);
926                 info[stripe].off++;
927         }
928 out:
929         if (info)
930                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
931
932         if (rc == 0)
933                 *reqset = set;
934         else
935                 lov_fini_brw_set(set);
936
937         RETURN(rc);
938 }
939
940 int lov_fini_getattr_set(struct lov_request_set *set)
941 {
942         int rc = 0;
943         ENTRY;
944
945         if (set == NULL)
946                 RETURN(0);
947         LASSERT(set->set_exp);
948         if (set->set_completes)
949                 rc = common_attr_done(set);
950
951         if (atomic_dec_and_test(&set->set_refcount))
952                 lov_finish_set(set);
953
954         RETURN(rc);
955 }
956
957 /* The callback for osc_getattr_async that finilizes a request info when a
958  * response is recieved. */
959 static int cb_getattr_update(struct obd_info *oinfo, int rc)
960 {
961         struct lov_request *lovreq;
962         lovreq = container_of(oinfo, struct lov_request, rq_oi);
963         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
964 }
965
966 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
967                          struct lov_request_set **reqset)
968 {
969         struct lov_request_set *set;
970         struct lov_oinfo *loi = NULL;
971         struct lov_obd *lov = &exp->exp_obd->u.lov;
972         int rc = 0, i;
973         ENTRY;
974
975         OBD_ALLOC(set, sizeof(*set));
976         if (set == NULL)
977                 RETURN(-ENOMEM);
978         lov_init_set(set);
979
980         set->set_exp = exp;
981         set->set_oi = oinfo;
982
983         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
984                 struct lov_request *req;
985
986                 loi = oinfo->oi_md->lsm_oinfo[i];
987                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
988                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
989                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
990                         continue;
991                 }
992
993                 OBD_ALLOC(req, sizeof(*req));
994                 if (req == NULL)
995                         GOTO(out_set, rc = -ENOMEM);
996
997                 req->rq_stripe = i;
998                 req->rq_idx = loi->loi_ost_idx;
999
1000                 OBDO_ALLOC(req->rq_oi.oi_oa);
1001                 if (req->rq_oi.oi_oa == NULL) {
1002                         OBD_FREE(req, sizeof(*req));
1003                         GOTO(out_set, rc = -ENOMEM);
1004                 }
1005                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1006                        sizeof(*req->rq_oi.oi_oa));
1007                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1008                 req->rq_oi.oi_cb_up = cb_getattr_update;
1009
1010                 lov_set_add_req(req, set);
1011         }
1012         if (!set->set_count)
1013                 GOTO(out_set, rc = -EIO);
1014         *reqset = set;
1015         RETURN(rc);
1016 out_set:
1017         lov_fini_getattr_set(set);
1018         RETURN(rc);
1019 }
1020
1021 int lov_fini_destroy_set(struct lov_request_set *set)
1022 {
1023         ENTRY;
1024
1025         if (set == NULL)
1026                 RETURN(0);
1027         LASSERT(set->set_exp);
1028         if (set->set_completes) {
1029                 /* FIXME update qos data here */
1030         }
1031
1032         if (atomic_dec_and_test(&set->set_refcount))
1033                 lov_finish_set(set);
1034
1035         RETURN(0);
1036 }
1037
1038 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1039                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1040                          struct obd_trans_info *oti,
1041                          struct lov_request_set **reqset)
1042 {
1043         struct lov_request_set *set;
1044         struct lov_oinfo *loi = NULL;
1045         struct lov_obd *lov = &exp->exp_obd->u.lov;
1046         int rc = 0, i;
1047         ENTRY;
1048
1049         OBD_ALLOC(set, sizeof(*set));
1050         if (set == NULL)
1051                 RETURN(-ENOMEM);
1052         lov_init_set(set);
1053
1054         set->set_exp = exp;
1055         set->set_oi = oinfo;
1056         set->set_oi->oi_md = lsm;
1057         set->set_oi->oi_oa = src_oa;
1058         set->set_oti = oti;
1059         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1060                 set->set_cookies = oti->oti_logcookies;
1061
1062         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1063                 struct lov_request *req;
1064
1065                 loi = lsm->lsm_oinfo[i];
1066                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1067                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1068                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1069                         continue;
1070                 }
1071
1072                 OBD_ALLOC(req, sizeof(*req));
1073                 if (req == NULL)
1074                         GOTO(out_set, rc = -ENOMEM);
1075
1076                 req->rq_stripe = i;
1077                 req->rq_idx = loi->loi_ost_idx;
1078
1079                 OBDO_ALLOC(req->rq_oi.oi_oa);
1080                 if (req->rq_oi.oi_oa == NULL) {
1081                         OBD_FREE(req, sizeof(*req));
1082                         GOTO(out_set, rc = -ENOMEM);
1083                 }
1084                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1085                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1086                 lov_set_add_req(req, set);
1087         }
1088         if (!set->set_count)
1089                 GOTO(out_set, rc = -EIO);
1090         *reqset = set;
1091         RETURN(rc);
1092 out_set:
1093         lov_fini_destroy_set(set);
1094         RETURN(rc);
1095 }
1096
1097 int lov_fini_setattr_set(struct lov_request_set *set)
1098 {
1099         int rc = 0;
1100         ENTRY;
1101
1102         if (set == NULL)
1103                 RETURN(0);
1104         LASSERT(set->set_exp);
1105         if (set->set_completes) {
1106                 rc = common_attr_done(set);
1107                 /* FIXME update qos data here */
1108         }
1109
1110         if (atomic_dec_and_test(&set->set_refcount))
1111                 lov_finish_set(set);
1112         RETURN(rc);
1113 }
1114
1115 int lov_update_setattr_set(struct lov_request_set *set,
1116                            struct lov_request *req, int rc)
1117 {
1118         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1119         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1120         ENTRY;
1121
1122         lov_update_set(set, req, rc);
1123
1124         /* grace error on inactive ost */
1125         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1126                     lov->lov_tgts[req->rq_idx]->ltd_active))
1127                 rc = 0;
1128
1129         if (rc == 0) {
1130                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1131                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1132                                 req->rq_oi.oi_oa->o_ctime;
1133                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1134                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1135                                 req->rq_oi.oi_oa->o_mtime;
1136                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1137                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1138                                 req->rq_oi.oi_oa->o_atime;
1139         }
1140
1141         RETURN(rc);
1142 }
1143
1144 /* The callback for osc_setattr_async that finilizes a request info when a
1145  * response is recieved. */
1146 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1147 {
1148         struct lov_request *lovreq;
1149         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1150         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1151 }
1152
1153 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1154                          struct obd_trans_info *oti,
1155                          struct lov_request_set **reqset)
1156 {
1157         struct lov_request_set *set;
1158         struct lov_oinfo *loi = NULL;
1159         struct lov_obd *lov = &exp->exp_obd->u.lov;
1160         int rc = 0, i;
1161         ENTRY;
1162
1163         OBD_ALLOC(set, sizeof(*set));
1164         if (set == NULL)
1165                 RETURN(-ENOMEM);
1166         lov_init_set(set);
1167
1168         set->set_exp = exp;
1169         set->set_oti = oti;
1170         set->set_oi = oinfo;
1171         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1172                 set->set_cookies = oti->oti_logcookies;
1173
1174         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1175                 struct lov_request *req;
1176
1177                 loi = oinfo->oi_md->lsm_oinfo[i];
1178                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1179                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1180                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1181                         continue;
1182                 }
1183
1184                 OBD_ALLOC(req, sizeof(*req));
1185                 if (req == NULL)
1186                         GOTO(out_set, rc = -ENOMEM);
1187                 req->rq_stripe = i;
1188                 req->rq_idx = loi->loi_ost_idx;
1189
1190                 OBDO_ALLOC(req->rq_oi.oi_oa);
1191                 if (req->rq_oi.oi_oa == NULL) {
1192                         OBD_FREE(req, sizeof(*req));
1193                         GOTO(out_set, rc = -ENOMEM);
1194                 }
1195                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1196                        sizeof(*req->rq_oi.oi_oa));
1197                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1198                 req->rq_oi.oi_oa->o_stripe_idx = i;
1199                 req->rq_oi.oi_cb_up = cb_setattr_update;
1200                 req->rq_rqset = set;
1201
1202                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1203                         int off = lov_stripe_offset(oinfo->oi_md,
1204                                                     oinfo->oi_oa->o_size, i,
1205                                                     &req->rq_oi.oi_oa->o_size);
1206
1207                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1208                                 req->rq_oi.oi_oa->o_size--;
1209
1210                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1211                                i, req->rq_oi.oi_oa->o_size,
1212                                oinfo->oi_oa->o_size);
1213                 }
1214                 lov_set_add_req(req, set);
1215         }
1216         if (!set->set_count)
1217                 GOTO(out_set, rc = -EIO);
1218         *reqset = set;
1219         RETURN(rc);
1220 out_set:
1221         lov_fini_setattr_set(set);
1222         RETURN(rc);
1223 }
1224
1225 int lov_fini_punch_set(struct lov_request_set *set)
1226 {
1227         int rc = 0;
1228         ENTRY;
1229
1230         if (set == NULL)
1231                 RETURN(0);
1232         LASSERT(set->set_exp);
1233         if (set->set_completes) {
1234                 rc = -EIO;
1235                 /* FIXME update qos data here */
1236                 if (set->set_success)
1237                         rc = common_attr_done(set);
1238         }
1239
1240         if (atomic_dec_and_test(&set->set_refcount))
1241                 lov_finish_set(set);
1242
1243         RETURN(rc);
1244 }
1245
1246 int lov_update_punch_set(struct lov_request_set *set,
1247                            struct lov_request *req, int rc)
1248 {
1249         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1250         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1251         ENTRY;
1252
1253         lov_update_set(set, req, rc);
1254
1255         /* grace error on inactive ost */
1256         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1257                 rc = 0;
1258
1259         if (rc == 0) {
1260                 lov_stripe_lock(lsm);
1261                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1262                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1263                                 req->rq_oi.oi_oa->o_blocks;
1264                 }
1265
1266                 /* Do we need to update lvb_size here? It needn't because
1267                  * it have been done in ll_truncate(). -jay */
1268                 lov_stripe_unlock(lsm);
1269         }
1270
1271         RETURN(rc);
1272 }
1273
1274 /* The callback for osc_punch that finilizes a request info when a response
1275  * is recieved. */
1276 static int cb_update_punch(struct obd_info *oinfo, int rc)
1277 {
1278         struct lov_request *lovreq;
1279         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1280         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1281 }
1282
1283 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1284                        struct obd_trans_info *oti,
1285                        struct lov_request_set **reqset)
1286 {
1287         struct lov_request_set *set;
1288         struct lov_oinfo *loi = NULL;
1289         struct lov_obd *lov = &exp->exp_obd->u.lov;
1290         int rc = 0, i;
1291         ENTRY;
1292
1293         OBD_ALLOC(set, sizeof(*set));
1294         if (set == NULL)
1295                 RETURN(-ENOMEM);
1296         lov_init_set(set);
1297
1298         set->set_oi = oinfo;
1299         set->set_exp = exp;
1300
1301         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1302                 struct lov_request *req;
1303                 obd_off rs, re;
1304
1305                 loi = oinfo->oi_md->lsm_oinfo[i];
1306                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1307                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1308                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1309                         continue;
1310                 }
1311
1312                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1313                                            oinfo->oi_policy.l_extent.start,
1314                                            oinfo->oi_policy.l_extent.end,
1315                                            &rs, &re))
1316                         continue;
1317
1318                 OBD_ALLOC(req, sizeof(*req));
1319                 if (req == NULL)
1320                         GOTO(out_set, rc = -ENOMEM);
1321                 req->rq_stripe = i;
1322                 req->rq_idx = loi->loi_ost_idx;
1323
1324                 OBDO_ALLOC(req->rq_oi.oi_oa);
1325                 if (req->rq_oi.oi_oa == NULL) {
1326                         OBD_FREE(req, sizeof(*req));
1327                         GOTO(out_set, rc = -ENOMEM);
1328                 }
1329                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1330                        sizeof(*req->rq_oi.oi_oa));
1331                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1332                 req->rq_oi.oi_oa->o_stripe_idx = i;
1333                 req->rq_oi.oi_cb_up = cb_update_punch;
1334                 req->rq_rqset = set;
1335
1336                 req->rq_oi.oi_policy.l_extent.start = rs;
1337                 req->rq_oi.oi_policy.l_extent.end = re;
1338                 req->rq_oi.oi_policy.l_extent.gid = -1;
1339
1340                 lov_set_add_req(req, set);
1341         }
1342         if (!set->set_count)
1343                 GOTO(out_set, rc = -EIO);
1344         *reqset = set;
1345         RETURN(rc);
1346 out_set:
1347         lov_fini_punch_set(set);
1348         RETURN(rc);
1349 }
1350
1351 int lov_fini_sync_set(struct lov_request_set *set)
1352 {
1353         int rc = 0;
1354         ENTRY;
1355
1356         if (set == NULL)
1357                 RETURN(0);
1358         LASSERT(set->set_exp);
1359         if (set->set_completes) {
1360                 if (!set->set_success)
1361                         rc = -EIO;
1362                 /* FIXME update qos data here */
1363         }
1364
1365         if (atomic_dec_and_test(&set->set_refcount))
1366                 lov_finish_set(set);
1367
1368         RETURN(rc);
1369 }
1370
1371 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1372                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1373                       obd_off start, obd_off end,
1374                       struct lov_request_set **reqset)
1375 {
1376         struct lov_request_set *set;
1377         struct lov_oinfo *loi = NULL;
1378         struct lov_obd *lov = &exp->exp_obd->u.lov;
1379         int rc = 0, i;
1380         ENTRY;
1381
1382         OBD_ALLOC(set, sizeof(*set));
1383         if (set == NULL)
1384                 RETURN(-ENOMEM);
1385         lov_init_set(set);
1386
1387         set->set_exp = exp;
1388         set->set_oi = oinfo;
1389         set->set_oi->oi_md = lsm;
1390         set->set_oi->oi_oa = src_oa;
1391
1392         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1393                 struct lov_request *req;
1394                 obd_off rs, re;
1395
1396                 loi = lsm->lsm_oinfo[i];
1397                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1398                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1399                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1400                         continue;
1401                 }
1402
1403                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1404                         continue;
1405
1406                 OBD_ALLOC(req, sizeof(*req));
1407                 if (req == NULL)
1408                         GOTO(out_set, rc = -ENOMEM);
1409                 req->rq_stripe = i;
1410                 req->rq_idx = loi->loi_ost_idx;
1411
1412                 OBDO_ALLOC(req->rq_oi.oi_oa);
1413                 if (req->rq_oi.oi_oa == NULL) {
1414                         OBD_FREE(req, sizeof(*req));
1415                         GOTO(out_set, rc = -ENOMEM);
1416                 }
1417                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1418                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1419                 req->rq_oi.oi_oa->o_stripe_idx = i;
1420
1421                 req->rq_oi.oi_policy.l_extent.start = rs;
1422                 req->rq_oi.oi_policy.l_extent.end = re;
1423                 req->rq_oi.oi_policy.l_extent.gid = -1;
1424
1425                 lov_set_add_req(req, set);
1426         }
1427         if (!set->set_count)
1428                 GOTO(out_set, rc = -EIO);
1429         *reqset = set;
1430         RETURN(rc);
1431 out_set:
1432         lov_fini_sync_set(set);
1433         RETURN(rc);
1434 }
1435
1436 #define LOV_U64_MAX ((__u64)~0ULL)
1437 #define LOV_SUM_MAX(tot, add)                                           \
1438         do {                                                            \
1439                 if ((tot) + (add) < (tot))                              \
1440                         (tot) = LOV_U64_MAX;                            \
1441                 else                                                    \
1442                         (tot) += (add);                                 \
1443         } while(0)
1444
1445 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1446 {
1447         ENTRY;
1448
1449         if (success) {
1450                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1451
1452                 if (osfs->os_files != LOV_U64_MAX)
1453                         do_div(osfs->os_files, expected_stripes);
1454                 if (osfs->os_ffree != LOV_U64_MAX)
1455                         do_div(osfs->os_ffree, expected_stripes);
1456
1457                 spin_lock(&obd->obd_osfs_lock);
1458                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1459                 obd->obd_osfs_age = get_jiffies_64();
1460                 spin_unlock(&obd->obd_osfs_lock);
1461                 RETURN(0);
1462         }
1463
1464         RETURN(-EIO);
1465 }
1466
1467 int lov_fini_statfs_set(struct lov_request_set *set)
1468 {
1469         int rc = 0;
1470         ENTRY;
1471
1472         if (set == NULL)
1473                 RETURN(0);
1474
1475         if (set->set_completes) {
1476                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1477                                      set->set_success);
1478         }
1479
1480         if (atomic_dec_and_test(&set->set_refcount))
1481                 lov_finish_set(set);
1482
1483         RETURN(rc);
1484 }
1485
1486 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1487                        struct obd_statfs *lov_sfs, int success)
1488 {
1489         int shift = 0, quit = 0;
1490         __u64 tmp;
1491         spin_lock(&obd->obd_osfs_lock);
1492         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1493         obd->obd_osfs_age = get_jiffies_64();
1494         spin_unlock(&obd->obd_osfs_lock);
1495
1496         if (success == 0) {
1497                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1498         } else {
1499                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1500                         /* assume all block sizes are always powers of 2 */
1501                         /* get the bits difference */
1502                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1503                         for (shift = 0; shift <= 64; ++shift) {
1504                                 if (tmp & 1) {
1505                                         if (quit)
1506                                                 break;
1507                                         else
1508                                                 quit = 1;
1509                                         shift = 0;
1510                                 }
1511                                 tmp >>= 1;
1512                         }
1513                 }
1514
1515                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1516                         osfs->os_bsize = lov_sfs->os_bsize;
1517
1518                         osfs->os_bfree  >>= shift;
1519                         osfs->os_bavail >>= shift;
1520                         osfs->os_blocks >>= shift;
1521                 } else if (shift != 0) {
1522                         lov_sfs->os_bfree  >>= shift;
1523                         lov_sfs->os_bavail >>= shift;
1524                         lov_sfs->os_blocks >>= shift;
1525                 }
1526 #ifdef MIN_DF
1527                 /* Sandia requested that df (and so, statfs) only
1528                    returned minimal available space on
1529                    a single OST, so people would be able to
1530                    write this much data guaranteed. */
1531                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1532                         /* Presumably if new bavail is smaller,
1533                            new bfree is bigger as well */
1534                         osfs->os_bfree = lov_sfs->os_bfree;
1535                         osfs->os_bavail = lov_sfs->os_bavail;
1536                 }
1537 #else
1538                 osfs->os_bfree += lov_sfs->os_bfree;
1539                 osfs->os_bavail += lov_sfs->os_bavail;
1540 #endif
1541                 osfs->os_blocks += lov_sfs->os_blocks;
1542                 /* XXX not sure about this one - depends on policy.
1543                  *   - could be minimum if we always stripe on all OBDs
1544                  *     (but that would be wrong for any other policy,
1545                  *     if one of the OBDs has no more objects left)
1546                  *   - could be sum if we stripe whole objects
1547                  *   - could be average, just to give a nice number
1548                  *
1549                  * To give a "reasonable" (if not wholly accurate)
1550                  * number, we divide the total number of free objects
1551                  * by expected stripe count (watch out for overflow).
1552                  */
1553                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1554                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1555         }
1556 }
1557
1558 /* The callback for osc_statfs_async that finilizes a request info when a
1559  * response is recieved. */
1560 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1561 {
1562         struct lov_request *lovreq;
1563         struct obd_statfs *osfs, *lov_sfs;
1564         struct obd_device *obd;
1565         struct lov_obd *lov;
1566         int success;
1567         ENTRY;
1568
1569         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1570         lov = &lovreq->rq_rqset->set_obd->u.lov;
1571         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1572
1573         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1574         lov_sfs = oinfo->oi_osfs;
1575
1576         success = lovreq->rq_rqset->set_success;
1577
1578         /* XXX: the same is done in lov_update_common_set, however
1579            lovset->set_exp is not initialized. */
1580         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1581         if (rc) {
1582                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1583                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1584                         rc = 0;
1585                 RETURN(rc);
1586         }
1587
1588         lov_update_statfs(obd, osfs, lov_sfs, success);
1589         qos_update(lov);
1590
1591         RETURN(0);
1592 }
1593
1594 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1595                         struct lov_request_set **reqset)
1596 {
1597         struct lov_request_set *set;
1598         struct lov_obd *lov = &obd->u.lov;
1599         int rc = 0, i;
1600         ENTRY;
1601
1602         OBD_ALLOC(set, sizeof(*set));
1603         if (set == NULL)
1604                 RETURN(-ENOMEM);
1605         lov_init_set(set);
1606
1607         set->set_obd = obd;
1608         set->set_oi = oinfo;
1609
1610         /* We only get block data from the OBD */
1611         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1612                 struct lov_request *req;
1613
1614                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1615                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1616                         continue;
1617                 }
1618
1619                 OBD_ALLOC(req, sizeof(*req));
1620                 if (req == NULL)
1621                         GOTO(out_set, rc = -ENOMEM);
1622
1623                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1624                 if (req->rq_oi.oi_osfs == NULL) {
1625                         OBD_FREE(req, sizeof(*req));
1626                         GOTO(out_set, rc = -ENOMEM);
1627                 }
1628
1629                 req->rq_idx = i;
1630                 req->rq_oi.oi_cb_up = cb_statfs_update;
1631                 req->rq_rqset = set;
1632
1633                 lov_set_add_req(req, set);
1634         }
1635         if (!set->set_count)
1636                 GOTO(out_set, rc = -EIO);
1637         *reqset = set;
1638         RETURN(rc);
1639 out_set:
1640         lov_fini_statfs_set(set);
1641         RETURN(rc);
1642 }