Whamcloud - gitweb
LU-657 test: limit the write size in run_dd
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOV
38
39 #ifdef __KERNEL__
40 #include <libcfs/libcfs.h>
41 #else
42 #include <liblustre.h>
43 #endif
44
45 #include <obd_class.h>
46 #include <obd_lov.h>
47 #include <lustre/lustre_idl.h>
48
49 #include "lov_internal.h"
50
51 static void lov_init_set(struct lov_request_set *set)
52 {
53         set->set_count = 0;
54         cfs_atomic_set(&set->set_completes, 0);
55         cfs_atomic_set(&set->set_success, 0);
56         cfs_atomic_set(&set->set_finish_checked, 0);
57         set->set_cookies = 0;
58         CFS_INIT_LIST_HEAD(&set->set_list);
59         cfs_atomic_set(&set->set_refcount, 1);
60         cfs_waitq_init(&set->set_waitq);
61         spin_lock_init(&set->set_lock);
62 }
63
64 void lov_finish_set(struct lov_request_set *set)
65 {
66         cfs_list_t *pos, *n;
67         ENTRY;
68
69         LASSERT(set);
70         cfs_list_for_each_safe(pos, n, &set->set_list) {
71                 struct lov_request *req = cfs_list_entry(pos,
72                                                          struct lov_request,
73                                                          rq_link);
74                 cfs_list_del_init(&req->rq_link);
75
76                 if (req->rq_oi.oi_oa)
77                         OBDO_FREE(req->rq_oi.oi_oa);
78                 if (req->rq_oi.oi_md)
79                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
80                 if (req->rq_oi.oi_osfs)
81                         OBD_FREE(req->rq_oi.oi_osfs,
82                                  sizeof(*req->rq_oi.oi_osfs));
83                 OBD_FREE(req, sizeof(*req));
84         }
85
86         if (set->set_pga) {
87                 int len = set->set_oabufs * sizeof(*set->set_pga);
88                 OBD_FREE_LARGE(set->set_pga, len);
89         }
90         if (set->set_lockh)
91                 lov_llh_put(set->set_lockh);
92
93         OBD_FREE(set, sizeof(*set));
94         EXIT;
95 }
96
97 int lov_set_finished(struct lov_request_set *set, int idempotent)
98 {
99         int completes = cfs_atomic_read(&set->set_completes);
100
101         CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
102
103         if (completes == set->set_count) {
104                 if (idempotent)
105                         return 1;
106                 if (cfs_atomic_inc_return(&set->set_finish_checked) == 1)
107                         return 1;
108         }
109         return 0;
110 }
111
112 void lov_update_set(struct lov_request_set *set,
113                     struct lov_request *req, int rc)
114 {
115         req->rq_complete = 1;
116         req->rq_rc = rc;
117
118         cfs_atomic_inc(&set->set_completes);
119         if (rc == 0)
120                 cfs_atomic_inc(&set->set_success);
121
122         cfs_waitq_signal(&set->set_waitq);
123 }
124
125 int lov_update_common_set(struct lov_request_set *set,
126                           struct lov_request *req, int rc)
127 {
128         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
129         ENTRY;
130
131         lov_update_set(set, req, rc);
132
133         /* grace error on inactive ost */
134         if (rc && !(lov->lov_tgts[req->rq_idx] &&
135                     lov->lov_tgts[req->rq_idx]->ltd_active))
136                 rc = 0;
137
138         /* FIXME in raid1 regime, should return 0 */
139         RETURN(rc);
140 }
141
142 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
143 {
144         cfs_list_add_tail(&req->rq_link, &set->set_list);
145         set->set_count++;
146         req->rq_rqset = set;
147 }
148
149 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
150                                struct lov_oinfo *loi, int flags,
151                                struct ost_lvb *lvb, __u32 mode, int rc);
152
153 static int lov_update_enqueue_lov(struct obd_export *exp,
154                                   struct lustre_handle *lov_lockhp,
155                                   struct lov_oinfo *loi, int flags, int idx,
156                                   __u64 oid, int rc)
157 {
158         struct lov_obd *lov = &exp->exp_obd->u.lov;
159
160         if (rc != ELDLM_OK &&
161             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
162                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
163                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
164                         /* -EUSERS used by OST to report file contention */
165                         if (rc != -EINTR && rc != -EUSERS)
166                                 CERROR("enqueue objid "LPX64" subobj "
167                                        LPX64" on OST idx %d: rc %d\n",
168                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
169                 } else
170                         rc = ELDLM_OK;
171         }
172         return rc;
173 }
174
175 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
176 {
177         struct lov_request_set *set = req->rq_rqset;
178         struct lustre_handle *lov_lockhp;
179         struct obd_info *oi = set->set_oi;
180         struct lov_oinfo *loi;
181         ENTRY;
182
183         LASSERT(oi != NULL);
184
185         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
186         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
187
188         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
189          * and that copy can be arbitrarily out of date.
190          *
191          * The LOV API is due for a serious rewriting anyways, and this
192          * can be addressed then. */
193
194         lov_stripe_lock(oi->oi_md);
195         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
196                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
197         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
198                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
199         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
200                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
201         lov_stripe_unlock(oi->oi_md);
202         lov_update_set(set, req, rc);
203         RETURN(rc);
204 }
205
206 /* The callback for osc_enqueue that updates lov info for every OSC request. */
207 static int cb_update_enqueue(void *cookie, int rc)
208 {
209         struct obd_info *oinfo = cookie;
210         struct ldlm_enqueue_info *einfo;
211         struct lov_request *lovreq;
212
213         lovreq = container_of(oinfo, struct lov_request, rq_oi);
214         einfo = lovreq->rq_rqset->set_ei;
215         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
216 }
217
218 static int enqueue_done(struct lov_request_set *set, __u32 mode)
219 {
220         struct lov_request *req;
221         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
222         int completes = cfs_atomic_read(&set->set_completes);
223         int rc = 0;
224         ENTRY;
225
226         /* enqueue/match success, just return */
227         if (completes && completes == cfs_atomic_read(&set->set_success))
228                 RETURN(0);
229
230         /* cancel enqueued/matched locks */
231         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
232                 struct lustre_handle *lov_lockhp;
233
234                 if (!req->rq_complete || req->rq_rc)
235                         continue;
236
237                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
238                 LASSERT(lov_lockhp);
239                 if (!lustre_handle_is_used(lov_lockhp))
240                         continue;
241
242                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
243                                 req->rq_oi.oi_md, mode, lov_lockhp);
244                 if (rc && lov->lov_tgts[req->rq_idx] &&
245                     lov->lov_tgts[req->rq_idx]->ltd_active)
246                         CERROR("cancelling obdjid "LPX64" on OST "
247                                "idx %d error: rc = %d\n",
248                                req->rq_oi.oi_md->lsm_object_id,
249                                req->rq_idx, rc);
250         }
251         if (set->set_lockh)
252                 lov_llh_put(set->set_lockh);
253         RETURN(rc);
254 }
255
256 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
257                          struct ptlrpc_request_set *rqset)
258 {
259         int ret = 0;
260         ENTRY;
261
262         if (set == NULL)
263                 RETURN(0);
264         LASSERT(set->set_exp);
265         /* Do enqueue_done only for sync requests and if any request
266          * succeeded. */
267         if (!rqset) {
268                 if (rc)
269                         cfs_atomic_set(&set->set_completes, 0);
270                 ret = enqueue_done(set, mode);
271         } else if (set->set_lockh)
272                 lov_llh_put(set->set_lockh);
273
274         lov_put_reqset(set);
275
276         RETURN(rc ? rc : ret);
277 }
278
279 static void lov_llh_addref(void *llhp)
280 {
281         struct lov_lock_handles *llh = llhp;
282
283         cfs_atomic_inc(&llh->llh_refcount);
284         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
285                cfs_atomic_read(&llh->llh_refcount));
286 }
287
288 static struct portals_handle_ops lov_handle_ops = {
289         .hop_addref = lov_llh_addref,
290         .hop_free   = NULL,
291 };
292
293 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
294 {
295         struct lov_lock_handles *llh;
296
297         OBD_ALLOC(llh, sizeof *llh +
298                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
299         if (llh == NULL)
300                 return NULL;
301
302         cfs_atomic_set(&llh->llh_refcount, 2);
303         llh->llh_stripe_count = lsm->lsm_stripe_count;
304         CFS_INIT_LIST_HEAD(&llh->llh_handle.h_link);
305         class_handle_hash(&llh->llh_handle, &lov_handle_ops);
306
307         return llh;
308 }
309
310 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
311                          struct ldlm_enqueue_info *einfo,
312                          struct lov_request_set **reqset)
313 {
314         struct lov_obd *lov = &exp->exp_obd->u.lov;
315         struct lov_request_set *set;
316         int i, rc = 0;
317         ENTRY;
318
319         OBD_ALLOC(set, sizeof(*set));
320         if (set == NULL)
321                 RETURN(-ENOMEM);
322         lov_init_set(set);
323
324         set->set_exp = exp;
325         set->set_oi = oinfo;
326         set->set_ei = einfo;
327         set->set_lockh = lov_llh_new(oinfo->oi_md);
328         if (set->set_lockh == NULL)
329                 GOTO(out_set, rc = -ENOMEM);
330         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
331
332         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
333                 struct lov_oinfo *loi;
334                 struct lov_request *req;
335                 obd_off start, end;
336
337                 loi = oinfo->oi_md->lsm_oinfo[i];
338                 if (!lov_stripe_intersects(oinfo->oi_md, i,
339                                            oinfo->oi_policy.l_extent.start,
340                                            oinfo->oi_policy.l_extent.end,
341                                            &start, &end))
342                         continue;
343
344                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
345                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
346                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
347                         continue;
348                 }
349
350                 OBD_ALLOC(req, sizeof(*req));
351                 if (req == NULL)
352                         GOTO(out_set, rc = -ENOMEM);
353
354                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
355                         sizeof(struct lov_oinfo *) +
356                         sizeof(struct lov_oinfo);
357                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
358                 if (req->rq_oi.oi_md == NULL) {
359                         OBD_FREE(req, sizeof(*req));
360                         GOTO(out_set, rc = -ENOMEM);
361                 }
362                 req->rq_oi.oi_md->lsm_oinfo[0] =
363                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
364                         sizeof(struct lov_oinfo *);
365
366                 /* Set lov request specific parameters. */
367                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
368                 req->rq_oi.oi_cb_up = cb_update_enqueue;
369                 req->rq_oi.oi_flags = oinfo->oi_flags;
370
371                 LASSERT(req->rq_oi.oi_lockh);
372
373                 req->rq_oi.oi_policy.l_extent.gid =
374                         oinfo->oi_policy.l_extent.gid;
375                 req->rq_oi.oi_policy.l_extent.start = start;
376                 req->rq_oi.oi_policy.l_extent.end = end;
377
378                 req->rq_idx = loi->loi_ost_idx;
379                 req->rq_stripe = i;
380
381                 /* XXX LOV STACKING: submd should be from the subobj */
382                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
383                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
384                 req->rq_oi.oi_md->lsm_stripe_count = 0;
385                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
386                         loi->loi_kms_valid;
387                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
388                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
389
390                 lov_set_add_req(req, set);
391         }
392         if (!set->set_count)
393                 GOTO(out_set, rc = -EIO);
394         *reqset = set;
395         RETURN(0);
396 out_set:
397         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
398         RETURN(rc);
399 }
400
401 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
402 {
403         int rc = 0;
404         ENTRY;
405
406         if (set == NULL)
407                 RETURN(0);
408         LASSERT(set->set_exp);
409         rc = enqueue_done(set, mode);
410         if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
411             (flags & LDLM_FL_TEST_LOCK))
412                 lov_llh_put(set->set_lockh);
413
414         lov_put_reqset(set);
415
416         RETURN(rc);
417 }
418
419 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
420                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
421                        __u32 mode, struct lustre_handle *lockh,
422                        struct lov_request_set **reqset)
423 {
424         struct lov_obd *lov = &exp->exp_obd->u.lov;
425         struct lov_request_set *set;
426         int i, rc = 0;
427         ENTRY;
428
429         OBD_ALLOC(set, sizeof(*set));
430         if (set == NULL)
431                 RETURN(-ENOMEM);
432         lov_init_set(set);
433
434         set->set_exp = exp;
435         set->set_oi = oinfo;
436         set->set_oi->oi_md = lsm;
437         set->set_lockh = lov_llh_new(lsm);
438         if (set->set_lockh == NULL)
439                 GOTO(out_set, rc = -ENOMEM);
440         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
441
442         for (i = 0; i < lsm->lsm_stripe_count; i++){
443                 struct lov_oinfo *loi;
444                 struct lov_request *req;
445                 obd_off start, end;
446
447                 loi = lsm->lsm_oinfo[i];
448                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
449                                            policy->l_extent.end, &start, &end))
450                         continue;
451
452                 /* FIXME raid1 should grace this error */
453                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
454                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
455                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
456                         GOTO(out_set, rc = -EIO);
457                 }
458
459                 OBD_ALLOC(req, sizeof(*req));
460                 if (req == NULL)
461                         GOTO(out_set, rc = -ENOMEM);
462
463                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
464                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
465                 if (req->rq_oi.oi_md == NULL) {
466                         OBD_FREE(req, sizeof(*req));
467                         GOTO(out_set, rc = -ENOMEM);
468                 }
469
470                 req->rq_oi.oi_policy.l_extent.start = start;
471                 req->rq_oi.oi_policy.l_extent.end = end;
472                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
473
474                 req->rq_idx = loi->loi_ost_idx;
475                 req->rq_stripe = i;
476
477                 /* XXX LOV STACKING: submd should be from the subobj */
478                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
479                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
480                 req->rq_oi.oi_md->lsm_stripe_count = 0;
481
482                 lov_set_add_req(req, set);
483         }
484         if (!set->set_count)
485                 GOTO(out_set, rc = -EIO);
486         *reqset = set;
487         RETURN(rc);
488 out_set:
489         lov_fini_match_set(set, mode, 0);
490         RETURN(rc);
491 }
492
493 int lov_fini_cancel_set(struct lov_request_set *set)
494 {
495         int rc = 0;
496         ENTRY;
497
498         if (set == NULL)
499                 RETURN(0);
500
501         LASSERT(set->set_exp);
502         if (set->set_lockh)
503                 lov_llh_put(set->set_lockh);
504
505         lov_put_reqset(set);
506
507         RETURN(rc);
508 }
509
510 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
511                         struct lov_stripe_md *lsm, __u32 mode,
512                         struct lustre_handle *lockh,
513                         struct lov_request_set **reqset)
514 {
515         struct lov_request_set *set;
516         int i, rc = 0;
517         ENTRY;
518
519         OBD_ALLOC(set, sizeof(*set));
520         if (set == NULL)
521                 RETURN(-ENOMEM);
522         lov_init_set(set);
523
524         set->set_exp = exp;
525         set->set_oi = oinfo;
526         set->set_oi->oi_md = lsm;
527         set->set_lockh = lov_handle2llh(lockh);
528         if (set->set_lockh == NULL) {
529                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
530                 GOTO(out_set, rc = -EINVAL);
531         }
532         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
533
534         for (i = 0; i < lsm->lsm_stripe_count; i++){
535                 struct lov_request *req;
536                 struct lustre_handle *lov_lockhp;
537                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
538
539                 lov_lockhp = set->set_lockh->llh_handles + i;
540                 if (!lustre_handle_is_used(lov_lockhp)) {
541                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
542                                loi->loi_ost_idx, loi->loi_id);
543                         continue;
544                 }
545
546                 OBD_ALLOC(req, sizeof(*req));
547                 if (req == NULL)
548                         GOTO(out_set, rc = -ENOMEM);
549
550                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
551                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
552                 if (req->rq_oi.oi_md == NULL) {
553                         OBD_FREE(req, sizeof(*req));
554                         GOTO(out_set, rc = -ENOMEM);
555                 }
556
557                 req->rq_idx = loi->loi_ost_idx;
558                 req->rq_stripe = i;
559
560                 /* XXX LOV STACKING: submd should be from the subobj */
561                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
562                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
563                 req->rq_oi.oi_md->lsm_stripe_count = 0;
564
565                 lov_set_add_req(req, set);
566         }
567         if (!set->set_count)
568                 GOTO(out_set, rc = -EIO);
569         *reqset = set;
570         RETURN(rc);
571 out_set:
572         lov_fini_cancel_set(set);
573         RETURN(rc);
574 }
575 static int common_attr_done(struct lov_request_set *set)
576 {
577         cfs_list_t *pos;
578         struct lov_request *req;
579         struct obdo *tmp_oa;
580         int rc = 0, attrset = 0;
581         ENTRY;
582
583         LASSERT(set->set_oi != NULL);
584
585         if (set->set_oi->oi_oa == NULL)
586                 RETURN(0);
587
588         if (!cfs_atomic_read(&set->set_success))
589                 RETURN(-EIO);
590
591         OBDO_ALLOC(tmp_oa);
592         if (tmp_oa == NULL)
593                 GOTO(out, rc = -ENOMEM);
594
595         cfs_list_for_each (pos, &set->set_list) {
596                 req = cfs_list_entry(pos, struct lov_request, rq_link);
597
598                 if (!req->rq_complete || req->rq_rc)
599                         continue;
600                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
601                         continue;
602                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
603                                 req->rq_oi.oi_oa->o_valid,
604                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
605         }
606         if (!attrset) {
607                 CERROR("No stripes had valid attrs\n");
608                 rc = -EIO;
609         }
610         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
611             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
612                 /* When we take attributes of some epoch, we require all the
613                  * ost to be active. */
614                 CERROR("Not all the stripes had valid attrs\n");
615                 GOTO(out, rc = -EIO);
616         }
617
618         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
619         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
620 out:
621         if (tmp_oa)
622                 OBDO_FREE(tmp_oa);
623         RETURN(rc);
624
625 }
626
627 static int brw_done(struct lov_request_set *set)
628 {
629         struct lov_stripe_md *lsm = set->set_oi->oi_md;
630         struct lov_oinfo     *loi = NULL;
631         cfs_list_t *pos;
632         struct lov_request *req;
633         ENTRY;
634
635         cfs_list_for_each (pos, &set->set_list) {
636                 req = cfs_list_entry(pos, struct lov_request, rq_link);
637
638                 if (!req->rq_complete || req->rq_rc)
639                         continue;
640
641                 loi = lsm->lsm_oinfo[req->rq_stripe];
642
643                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
644                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
645         }
646
647         RETURN(0);
648 }
649
650 int lov_fini_brw_set(struct lov_request_set *set)
651 {
652         int rc = 0;
653         ENTRY;
654
655         if (set == NULL)
656                 RETURN(0);
657         LASSERT(set->set_exp);
658         if (cfs_atomic_read(&set->set_completes)) {
659                 rc = brw_done(set);
660                 /* FIXME update qos data here */
661         }
662         lov_put_reqset(set);
663
664         RETURN(rc);
665 }
666
667 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
668                      obd_count oa_bufs, struct brw_page *pga,
669                      struct obd_trans_info *oti,
670                      struct lov_request_set **reqset)
671 {
672         struct {
673                 obd_count       index;
674                 obd_count       count;
675                 obd_count       off;
676         } *info = NULL;
677         struct lov_request_set *set;
678         struct lov_obd *lov = &exp->exp_obd->u.lov;
679         int rc = 0, i, shift;
680         ENTRY;
681
682         OBD_ALLOC(set, sizeof(*set));
683         if (set == NULL)
684                 RETURN(-ENOMEM);
685         lov_init_set(set);
686
687         set->set_exp = exp;
688         set->set_oti = oti;
689         set->set_oi = oinfo;
690         set->set_oabufs = oa_bufs;
691         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
692         if (!set->set_pga)
693                 GOTO(out, rc = -ENOMEM);
694
695         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
696         if (!info)
697                 GOTO(out, rc = -ENOMEM);
698
699         /* calculate the page count for each stripe */
700         for (i = 0; i < oa_bufs; i++) {
701                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
702                 info[stripe].count++;
703         }
704
705         /* alloc and initialize lov request */
706         shift = 0;
707         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
708                 struct lov_oinfo *loi = NULL;
709                 struct lov_request *req;
710
711                 if (info[i].count == 0)
712                         continue;
713
714                 loi = oinfo->oi_md->lsm_oinfo[i];
715                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
716                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
717                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
718                         GOTO(out, rc = -EIO);
719                 }
720
721                 OBD_ALLOC(req, sizeof(*req));
722                 if (req == NULL)
723                         GOTO(out, rc = -ENOMEM);
724
725                 OBDO_ALLOC(req->rq_oi.oi_oa);
726                 if (req->rq_oi.oi_oa == NULL) {
727                         OBD_FREE(req, sizeof(*req));
728                         GOTO(out, rc = -ENOMEM);
729                 }
730
731                 if (oinfo->oi_oa) {
732                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
733                                sizeof(*req->rq_oi.oi_oa));
734                 }
735                 req->rq_oi.oi_oa->o_id = loi->loi_id;
736                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
737                 req->rq_oi.oi_oa->o_stripe_idx = i;
738
739                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
740                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
741                 if (req->rq_oi.oi_md == NULL) {
742                         OBDO_FREE(req->rq_oi.oi_oa);
743                         OBD_FREE(req, sizeof(*req));
744                         GOTO(out, rc = -ENOMEM);
745                 }
746
747                 req->rq_idx = loi->loi_ost_idx;
748                 req->rq_stripe = i;
749
750                 /* XXX LOV STACKING */
751                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
752                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
753                 req->rq_oabufs = info[i].count;
754                 req->rq_pgaidx = shift;
755                 shift += req->rq_oabufs;
756
757                 /* remember the index for sort brw_page array */
758                 info[i].index = req->rq_pgaidx;
759
760                 req->rq_oi.oi_capa = oinfo->oi_capa;
761
762                 lov_set_add_req(req, set);
763         }
764         if (!set->set_count)
765                 GOTO(out, rc = -EIO);
766
767         /* rotate & sort the brw_page array */
768         for (i = 0; i < oa_bufs; i++) {
769                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
770
771                 shift = info[stripe].index + info[stripe].off;
772                 LASSERT(shift < oa_bufs);
773                 set->set_pga[shift] = pga[i];
774                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
775                                   &set->set_pga[shift].off);
776                 info[stripe].off++;
777         }
778 out:
779         if (info)
780                 OBD_FREE_LARGE(info,
781                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
782
783         if (rc == 0)
784                 *reqset = set;
785         else
786                 lov_fini_brw_set(set);
787
788         RETURN(rc);
789 }
790
791 int lov_fini_getattr_set(struct lov_request_set *set)
792 {
793         int rc = 0;
794         ENTRY;
795
796         if (set == NULL)
797                 RETURN(0);
798         LASSERT(set->set_exp);
799         if (cfs_atomic_read(&set->set_completes))
800                 rc = common_attr_done(set);
801
802         lov_put_reqset(set);
803
804         RETURN(rc);
805 }
806
807 /* The callback for osc_getattr_async that finilizes a request info when a
808  * response is received. */
809 static int cb_getattr_update(void *cookie, int rc)
810 {
811         struct obd_info *oinfo = cookie;
812         struct lov_request *lovreq;
813         lovreq = container_of(oinfo, struct lov_request, rq_oi);
814         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
815 }
816
817 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
818                          struct lov_request_set **reqset)
819 {
820         struct lov_request_set *set;
821         struct lov_obd *lov = &exp->exp_obd->u.lov;
822         int rc = 0, i;
823         ENTRY;
824
825         OBD_ALLOC(set, sizeof(*set));
826         if (set == NULL)
827                 RETURN(-ENOMEM);
828         lov_init_set(set);
829
830         set->set_exp = exp;
831         set->set_oi = oinfo;
832
833         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
834                 struct lov_oinfo *loi;
835                 struct lov_request *req;
836
837                 loi = oinfo->oi_md->lsm_oinfo[i];
838                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
839                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
840                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
841                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
842                                 /* SOM requires all the OSTs to be active. */
843                                 GOTO(out_set, rc = -EIO);
844                         continue;
845                 }
846
847                 OBD_ALLOC(req, sizeof(*req));
848                 if (req == NULL)
849                         GOTO(out_set, rc = -ENOMEM);
850
851                 req->rq_stripe = i;
852                 req->rq_idx = loi->loi_ost_idx;
853
854                 OBDO_ALLOC(req->rq_oi.oi_oa);
855                 if (req->rq_oi.oi_oa == NULL) {
856                         OBD_FREE(req, sizeof(*req));
857                         GOTO(out_set, rc = -ENOMEM);
858                 }
859                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
860                        sizeof(*req->rq_oi.oi_oa));
861                 req->rq_oi.oi_oa->o_id = loi->loi_id;
862                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
863                 req->rq_oi.oi_cb_up = cb_getattr_update;
864                 req->rq_oi.oi_capa = oinfo->oi_capa;
865
866                 lov_set_add_req(req, set);
867         }
868         if (!set->set_count)
869                 GOTO(out_set, rc = -EIO);
870         *reqset = set;
871         RETURN(rc);
872 out_set:
873         lov_fini_getattr_set(set);
874         RETURN(rc);
875 }
876
877 int lov_fini_destroy_set(struct lov_request_set *set)
878 {
879         ENTRY;
880
881         if (set == NULL)
882                 RETURN(0);
883         LASSERT(set->set_exp);
884         if (cfs_atomic_read(&set->set_completes)) {
885                 /* FIXME update qos data here */
886         }
887
888         lov_put_reqset(set);
889
890         RETURN(0);
891 }
892
893 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
894                          struct obdo *src_oa, struct lov_stripe_md *lsm,
895                          struct obd_trans_info *oti,
896                          struct lov_request_set **reqset)
897 {
898         struct lov_request_set *set;
899         struct lov_obd *lov = &exp->exp_obd->u.lov;
900         int rc = 0, i;
901         ENTRY;
902
903         OBD_ALLOC(set, sizeof(*set));
904         if (set == NULL)
905                 RETURN(-ENOMEM);
906         lov_init_set(set);
907
908         set->set_exp = exp;
909         set->set_oi = oinfo;
910         set->set_oi->oi_md = lsm;
911         set->set_oi->oi_oa = src_oa;
912         set->set_oti = oti;
913         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
914                 set->set_cookies = oti->oti_logcookies;
915
916         for (i = 0; i < lsm->lsm_stripe_count; i++) {
917                 struct lov_oinfo *loi;
918                 struct lov_request *req;
919
920                 loi = lsm->lsm_oinfo[i];
921                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
922                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
923                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
924                         continue;
925                 }
926
927                 OBD_ALLOC(req, sizeof(*req));
928                 if (req == NULL)
929                         GOTO(out_set, rc = -ENOMEM);
930
931                 req->rq_stripe = i;
932                 req->rq_idx = loi->loi_ost_idx;
933
934                 OBDO_ALLOC(req->rq_oi.oi_oa);
935                 if (req->rq_oi.oi_oa == NULL) {
936                         OBD_FREE(req, sizeof(*req));
937                         GOTO(out_set, rc = -ENOMEM);
938                 }
939                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
940                 req->rq_oi.oi_oa->o_id = loi->loi_id;
941                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
942                 lov_set_add_req(req, set);
943         }
944         if (!set->set_count)
945                 GOTO(out_set, rc = -EIO);
946         *reqset = set;
947         RETURN(rc);
948 out_set:
949         lov_fini_destroy_set(set);
950         RETURN(rc);
951 }
952
953 int lov_fini_setattr_set(struct lov_request_set *set)
954 {
955         int rc = 0;
956         ENTRY;
957
958         if (set == NULL)
959                 RETURN(0);
960         LASSERT(set->set_exp);
961         if (cfs_atomic_read(&set->set_completes)) {
962                 rc = common_attr_done(set);
963                 /* FIXME update qos data here */
964         }
965
966         lov_put_reqset(set);
967         RETURN(rc);
968 }
969
970 int lov_update_setattr_set(struct lov_request_set *set,
971                            struct lov_request *req, int rc)
972 {
973         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
974         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
975         ENTRY;
976
977         lov_update_set(set, req, rc);
978
979         /* grace error on inactive ost */
980         if (rc && !(lov->lov_tgts[req->rq_idx] &&
981                     lov->lov_tgts[req->rq_idx]->ltd_active))
982                 rc = 0;
983
984         if (rc == 0) {
985                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
986                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
987                                 req->rq_oi.oi_oa->o_ctime;
988                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
989                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
990                                 req->rq_oi.oi_oa->o_mtime;
991                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
992                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
993                                 req->rq_oi.oi_oa->o_atime;
994         }
995
996         RETURN(rc);
997 }
998
999 /* The callback for osc_setattr_async that finilizes a request info when a
1000  * response is received. */
1001 static int cb_setattr_update(void *cookie, int rc)
1002 {
1003         struct obd_info *oinfo = cookie;
1004         struct lov_request *lovreq;
1005         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1006         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1007 }
1008
1009 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1010                          struct obd_trans_info *oti,
1011                          struct lov_request_set **reqset)
1012 {
1013         struct lov_request_set *set;
1014         struct lov_obd *lov = &exp->exp_obd->u.lov;
1015         int rc = 0, i;
1016         ENTRY;
1017
1018         OBD_ALLOC(set, sizeof(*set));
1019         if (set == NULL)
1020                 RETURN(-ENOMEM);
1021         lov_init_set(set);
1022
1023         set->set_exp = exp;
1024         set->set_oti = oti;
1025         set->set_oi = oinfo;
1026         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1027                 set->set_cookies = oti->oti_logcookies;
1028
1029         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1030                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1031                 struct lov_request *req;
1032
1033                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1034                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1035                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1036                         continue;
1037                 }
1038
1039                 OBD_ALLOC(req, sizeof(*req));
1040                 if (req == NULL)
1041                         GOTO(out_set, rc = -ENOMEM);
1042                 req->rq_stripe = i;
1043                 req->rq_idx = loi->loi_ost_idx;
1044
1045                 OBDO_ALLOC(req->rq_oi.oi_oa);
1046                 if (req->rq_oi.oi_oa == NULL) {
1047                         OBD_FREE(req, sizeof(*req));
1048                         GOTO(out_set, rc = -ENOMEM);
1049                 }
1050                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1051                        sizeof(*req->rq_oi.oi_oa));
1052                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1053                 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1054                 req->rq_oi.oi_oa->o_stripe_idx = i;
1055                 req->rq_oi.oi_cb_up = cb_setattr_update;
1056                 req->rq_oi.oi_capa = oinfo->oi_capa;
1057
1058                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1059                         int off = lov_stripe_offset(oinfo->oi_md,
1060                                                     oinfo->oi_oa->o_size, i,
1061                                                     &req->rq_oi.oi_oa->o_size);
1062
1063                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1064                                 req->rq_oi.oi_oa->o_size--;
1065
1066                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1067                                i, req->rq_oi.oi_oa->o_size,
1068                                oinfo->oi_oa->o_size);
1069                 }
1070                 lov_set_add_req(req, set);
1071         }
1072         if (!set->set_count)
1073                 GOTO(out_set, rc = -EIO);
1074         *reqset = set;
1075         RETURN(rc);
1076 out_set:
1077         lov_fini_setattr_set(set);
1078         RETURN(rc);
1079 }
1080
1081 int lov_fini_punch_set(struct lov_request_set *set)
1082 {
1083         int rc = 0;
1084         ENTRY;
1085
1086         if (set == NULL)
1087                 RETURN(0);
1088         LASSERT(set->set_exp);
1089         if (cfs_atomic_read(&set->set_completes)) {
1090                 rc = -EIO;
1091                 /* FIXME update qos data here */
1092                 if (cfs_atomic_read(&set->set_success))
1093                         rc = common_attr_done(set);
1094         }
1095
1096         lov_put_reqset(set);
1097
1098         RETURN(rc);
1099 }
1100
1101 int lov_update_punch_set(struct lov_request_set *set,
1102                          struct lov_request *req, int rc)
1103 {
1104         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1105         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1106         ENTRY;
1107
1108         lov_update_set(set, req, rc);
1109
1110         /* grace error on inactive ost */
1111         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1112                 rc = 0;
1113
1114         if (rc == 0) {
1115                 lov_stripe_lock(lsm);
1116                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1117                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1118                                 req->rq_oi.oi_oa->o_blocks;
1119                 }
1120
1121                 lov_stripe_unlock(lsm);
1122         }
1123
1124         RETURN(rc);
1125 }
1126
1127 /* The callback for osc_punch that finilizes a request info when a response
1128  * is received. */
1129 static int cb_update_punch(void *cookie, int rc)
1130 {
1131         struct obd_info *oinfo = cookie;
1132         struct lov_request *lovreq;
1133         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1134         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1135 }
1136
1137 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1138                        struct obd_trans_info *oti,
1139                        struct lov_request_set **reqset)
1140 {
1141         struct lov_request_set *set;
1142         struct lov_obd *lov = &exp->exp_obd->u.lov;
1143         int rc = 0, i;
1144         ENTRY;
1145
1146         OBD_ALLOC(set, sizeof(*set));
1147         if (set == NULL)
1148                 RETURN(-ENOMEM);
1149         lov_init_set(set);
1150
1151         set->set_oi = oinfo;
1152         set->set_exp = exp;
1153
1154         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1155                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1156                 struct lov_request *req;
1157                 obd_off rs, re;
1158
1159                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1160                                            oinfo->oi_policy.l_extent.start,
1161                                            oinfo->oi_policy.l_extent.end,
1162                                            &rs, &re))
1163                         continue;
1164
1165                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1166                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1167                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1168                         GOTO(out_set, rc = -EIO);
1169                 }
1170
1171                 OBD_ALLOC(req, sizeof(*req));
1172                 if (req == NULL)
1173                         GOTO(out_set, rc = -ENOMEM);
1174                 req->rq_stripe = i;
1175                 req->rq_idx = loi->loi_ost_idx;
1176
1177                 OBDO_ALLOC(req->rq_oi.oi_oa);
1178                 if (req->rq_oi.oi_oa == NULL) {
1179                         OBD_FREE(req, sizeof(*req));
1180                         GOTO(out_set, rc = -ENOMEM);
1181                 }
1182                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1183                        sizeof(*req->rq_oi.oi_oa));
1184                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1185                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1186                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1187
1188                 req->rq_oi.oi_oa->o_stripe_idx = i;
1189                 req->rq_oi.oi_cb_up = cb_update_punch;
1190
1191                 req->rq_oi.oi_policy.l_extent.start = rs;
1192                 req->rq_oi.oi_policy.l_extent.end = re;
1193                 req->rq_oi.oi_policy.l_extent.gid = -1;
1194
1195                 req->rq_oi.oi_capa = oinfo->oi_capa;
1196
1197                 lov_set_add_req(req, set);
1198         }
1199         if (!set->set_count)
1200                 GOTO(out_set, rc = -EIO);
1201         *reqset = set;
1202         RETURN(rc);
1203 out_set:
1204         lov_fini_punch_set(set);
1205         RETURN(rc);
1206 }
1207
1208 int lov_fini_sync_set(struct lov_request_set *set)
1209 {
1210         int rc = 0;
1211         ENTRY;
1212
1213         if (set == NULL)
1214                 RETURN(0);
1215         LASSERT(set->set_exp);
1216         if (cfs_atomic_read(&set->set_completes)) {
1217                 if (!cfs_atomic_read(&set->set_success))
1218                         rc = -EIO;
1219                 /* FIXME update qos data here */
1220         }
1221
1222         lov_put_reqset(set);
1223
1224         RETURN(rc);
1225 }
1226
1227 /* The callback for osc_sync that finilizes a request info when a
1228  * response is recieved. */
1229 static int cb_sync_update(void *cookie, int rc)
1230 {
1231         struct obd_info *oinfo = cookie;
1232         struct lov_request *lovreq;
1233
1234         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1235         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1236 }
1237
1238 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1239                       obd_off start, obd_off end,
1240                       struct lov_request_set **reqset)
1241 {
1242         struct lov_request_set *set;
1243         struct lov_obd *lov = &exp->exp_obd->u.lov;
1244         int rc = 0, i;
1245         ENTRY;
1246
1247         OBD_ALLOC_PTR(set);
1248         if (set == NULL)
1249                 RETURN(-ENOMEM);
1250         lov_init_set(set);
1251
1252         set->set_exp = exp;
1253         set->set_oi = oinfo;
1254
1255         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1256                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1257                 struct lov_request *req;
1258                 obd_off rs, re;
1259
1260                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1261                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1262                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1263                         continue;
1264                 }
1265
1266                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1267                                            &re))
1268                         continue;
1269
1270                 OBD_ALLOC_PTR(req);
1271                 if (req == NULL)
1272                         GOTO(out_set, rc = -ENOMEM);
1273                 req->rq_stripe = i;
1274                 req->rq_idx = loi->loi_ost_idx;
1275
1276                 OBDO_ALLOC(req->rq_oi.oi_oa);
1277                 if (req->rq_oi.oi_oa == NULL) {
1278                         OBD_FREE(req, sizeof(*req));
1279                         GOTO(out_set, rc = -ENOMEM);
1280                 }
1281                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1282                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1283                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1284                 req->rq_oi.oi_oa->o_stripe_idx = i;
1285
1286                 req->rq_oi.oi_policy.l_extent.start = rs;
1287                 req->rq_oi.oi_policy.l_extent.end = re;
1288                 req->rq_oi.oi_policy.l_extent.gid = -1;
1289                 req->rq_oi.oi_cb_up = cb_sync_update;
1290
1291                 lov_set_add_req(req, set);
1292         }
1293         if (!set->set_count)
1294                 GOTO(out_set, rc = -EIO);
1295         *reqset = set;
1296         RETURN(rc);
1297 out_set:
1298         lov_fini_sync_set(set);
1299         RETURN(rc);
1300 }
1301
1302 #define LOV_U64_MAX ((__u64)~0ULL)
1303 #define LOV_SUM_MAX(tot, add)                                           \
1304         do {                                                            \
1305                 if ((tot) + (add) < (tot))                              \
1306                         (tot) = LOV_U64_MAX;                            \
1307                 else                                                    \
1308                         (tot) += (add);                                 \
1309         } while(0)
1310
1311 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1312 {
1313         ENTRY;
1314
1315         if (success) {
1316                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1317                                                            LOV_MAGIC, 0);
1318                 if (osfs->os_files != LOV_U64_MAX)
1319                         lov_do_div64(osfs->os_files, expected_stripes);
1320                 if (osfs->os_ffree != LOV_U64_MAX)
1321                         lov_do_div64(osfs->os_ffree, expected_stripes);
1322
1323                 spin_lock(&obd->obd_osfs_lock);
1324                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1325                 obd->obd_osfs_age = cfs_time_current_64();
1326                 spin_unlock(&obd->obd_osfs_lock);
1327                 RETURN(0);
1328         }
1329
1330         RETURN(-EIO);
1331 }
1332
1333 int lov_fini_statfs_set(struct lov_request_set *set)
1334 {
1335         int rc = 0;
1336         ENTRY;
1337
1338         if (set == NULL)
1339                 RETURN(0);
1340
1341         if (cfs_atomic_read(&set->set_completes)) {
1342                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1343                                      cfs_atomic_read(&set->set_success));
1344         }
1345         lov_put_reqset(set);
1346         RETURN(rc);
1347 }
1348
1349 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1350                        int success)
1351 {
1352         int shift = 0, quit = 0;
1353         __u64 tmp;
1354
1355         if (success == 0) {
1356                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1357         } else {
1358                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1359                         /* assume all block sizes are always powers of 2 */
1360                         /* get the bits difference */
1361                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1362                         for (shift = 0; shift <= 64; ++shift) {
1363                                 if (tmp & 1) {
1364                                         if (quit)
1365                                                 break;
1366                                         else
1367                                                 quit = 1;
1368                                         shift = 0;
1369                                 }
1370                                 tmp >>= 1;
1371                         }
1372                 }
1373
1374                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1375                         osfs->os_bsize = lov_sfs->os_bsize;
1376
1377                         osfs->os_bfree  >>= shift;
1378                         osfs->os_bavail >>= shift;
1379                         osfs->os_blocks >>= shift;
1380                 } else if (shift != 0) {
1381                         lov_sfs->os_bfree  >>= shift;
1382                         lov_sfs->os_bavail >>= shift;
1383                         lov_sfs->os_blocks >>= shift;
1384                 }
1385 #ifdef MIN_DF
1386                 /* Sandia requested that df (and so, statfs) only
1387                    returned minimal available space on
1388                    a single OST, so people would be able to
1389                    write this much data guaranteed. */
1390                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1391                         /* Presumably if new bavail is smaller,
1392                            new bfree is bigger as well */
1393                         osfs->os_bfree = lov_sfs->os_bfree;
1394                         osfs->os_bavail = lov_sfs->os_bavail;
1395                 }
1396 #else
1397                 osfs->os_bfree += lov_sfs->os_bfree;
1398                 osfs->os_bavail += lov_sfs->os_bavail;
1399 #endif
1400                 osfs->os_blocks += lov_sfs->os_blocks;
1401                 /* XXX not sure about this one - depends on policy.
1402                  *   - could be minimum if we always stripe on all OBDs
1403                  *     (but that would be wrong for any other policy,
1404                  *     if one of the OBDs has no more objects left)
1405                  *   - could be sum if we stripe whole objects
1406                  *   - could be average, just to give a nice number
1407                  *
1408                  * To give a "reasonable" (if not wholly accurate)
1409                  * number, we divide the total number of free objects
1410                  * by expected stripe count (watch out for overflow).
1411                  */
1412                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1413                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1414         }
1415 }
1416
1417 /* The callback for osc_statfs_async that finilizes a request info when a
1418  * response is received. */
1419 static int cb_statfs_update(void *cookie, int rc)
1420 {
1421         struct obd_info *oinfo = cookie;
1422         struct lov_request *lovreq;
1423         struct lov_request_set *set;
1424         struct obd_statfs *osfs, *lov_sfs;
1425         struct lov_obd *lov;
1426         struct lov_tgt_desc *tgt;
1427         struct obd_device *lovobd, *tgtobd;
1428         int success;
1429         ENTRY;
1430
1431         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1432         set = lovreq->rq_rqset;
1433         lovobd = set->set_obd;
1434         lov = &lovobd->u.lov;
1435         osfs = set->set_oi->oi_osfs;
1436         lov_sfs = oinfo->oi_osfs;
1437         success = cfs_atomic_read(&set->set_success);
1438         /* XXX: the same is done in lov_update_common_set, however
1439            lovset->set_exp is not initialized. */
1440         lov_update_set(set, lovreq, rc);
1441         if (rc)
1442                 GOTO(out, rc);
1443
1444         obd_getref(lovobd);
1445         tgt = lov->lov_tgts[lovreq->rq_idx];
1446         if (!tgt || !tgt->ltd_active)
1447                 GOTO(out_update, rc);
1448
1449         tgtobd = class_exp2obd(tgt->ltd_exp);
1450         spin_lock(&tgtobd->obd_osfs_lock);
1451         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1452         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1453                 tgtobd->obd_osfs_age = cfs_time_current_64();
1454         spin_unlock(&tgtobd->obd_osfs_lock);
1455
1456 out_update:
1457         lov_update_statfs(osfs, lov_sfs, success);
1458         obd_putref(lovobd);
1459
1460 out:
1461         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1462             lov_set_finished(set, 0)) {
1463                 lov_statfs_interpret(NULL, set, set->set_count !=
1464                                      cfs_atomic_read(&set->set_success));
1465         }
1466
1467         RETURN(0);
1468 }
1469
1470 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1471                         struct lov_request_set **reqset)
1472 {
1473         struct lov_request_set *set;
1474         struct lov_obd *lov = &obd->u.lov;
1475         int rc = 0, i;
1476         ENTRY;
1477
1478         OBD_ALLOC(set, sizeof(*set));
1479         if (set == NULL)
1480                 RETURN(-ENOMEM);
1481         lov_init_set(set);
1482
1483         set->set_obd = obd;
1484         set->set_oi = oinfo;
1485
1486         /* We only get block data from the OBD */
1487         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1488                 struct lov_request *req;
1489
1490                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1491                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1492                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1493                         continue;
1494                 }
1495
1496                 /* skip targets that have been explicitely disabled by the
1497                  * administrator */
1498                 if (!lov->lov_tgts[i]->ltd_exp) {
1499                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1500                         continue;
1501                 }
1502
1503                 OBD_ALLOC(req, sizeof(*req));
1504                 if (req == NULL)
1505                         GOTO(out_set, rc = -ENOMEM);
1506
1507                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1508                 if (req->rq_oi.oi_osfs == NULL) {
1509                         OBD_FREE(req, sizeof(*req));
1510                         GOTO(out_set, rc = -ENOMEM);
1511                 }
1512
1513                 req->rq_idx = i;
1514                 req->rq_oi.oi_cb_up = cb_statfs_update;
1515                 req->rq_oi.oi_flags = oinfo->oi_flags;
1516
1517                 lov_set_add_req(req, set);
1518         }
1519         if (!set->set_count)
1520                 GOTO(out_set, rc = -EIO);
1521         *reqset = set;
1522         RETURN(rc);
1523 out_set:
1524         lov_fini_statfs_set(set);
1525         RETURN(rc);
1526 }