Whamcloud - gitweb
LU-365 Update copyright for files modified by Whamcloud
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #ifndef EXPORT_SYMTAB
41 # define EXPORT_SYMTAB
42 #endif
43 #define DEBUG_SUBSYSTEM S_LOV
44
45 #ifdef __KERNEL__
46 #include <libcfs/libcfs.h>
47 #else
48 #include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include <obd_lov.h>
53 #include <lustre/lustre_idl.h>
54
55 #include "lov_internal.h"
56
57 static void lov_init_set(struct lov_request_set *set)
58 {
59         set->set_count = 0;
60         set->set_completes = 0;
61         set->set_success = 0;
62         set->set_cookies = 0;
63         CFS_INIT_LIST_HEAD(&set->set_list);
64         cfs_atomic_set(&set->set_refcount, 1);
65         cfs_waitq_init(&set->set_waitq);
66         cfs_spin_lock_init(&set->set_lock);
67 }
68
69 void lov_finish_set(struct lov_request_set *set)
70 {
71         cfs_list_t *pos, *n;
72         ENTRY;
73
74         LASSERT(set);
75         cfs_list_for_each_safe(pos, n, &set->set_list) {
76                 struct lov_request *req = cfs_list_entry(pos,
77                                                          struct lov_request,
78                                                          rq_link);
79                 cfs_list_del_init(&req->rq_link);
80
81                 if (req->rq_oi.oi_oa)
82                         OBDO_FREE(req->rq_oi.oi_oa);
83                 if (req->rq_oi.oi_md)
84                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
85                 if (req->rq_oi.oi_osfs)
86                         OBD_FREE(req->rq_oi.oi_osfs,
87                                  sizeof(*req->rq_oi.oi_osfs));
88                 OBD_FREE(req, sizeof(*req));
89         }
90
91         if (set->set_pga) {
92                 int len = set->set_oabufs * sizeof(*set->set_pga);
93                 OBD_FREE_LARGE(set->set_pga, len);
94         }
95         if (set->set_lockh)
96                 lov_llh_put(set->set_lockh);
97
98         OBD_FREE(set, sizeof(*set));
99         EXIT;
100 }
101
102 int lov_finished_set(struct lov_request_set *set)
103 {
104         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
105                set->set_count);
106         return set->set_completes == set->set_count;
107 }
108
109 void lov_update_set(struct lov_request_set *set,
110                     struct lov_request *req, int rc)
111 {
112         req->rq_complete = 1;
113         req->rq_rc = rc;
114
115         set->set_completes++;
116         if (rc == 0)
117                 set->set_success++;
118
119         cfs_waitq_signal(&set->set_waitq);
120 }
121
122 int lov_update_common_set(struct lov_request_set *set,
123                           struct lov_request *req, int rc)
124 {
125         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
126         ENTRY;
127
128         lov_update_set(set, req, rc);
129
130         /* grace error on inactive ost */
131         if (rc && !(lov->lov_tgts[req->rq_idx] &&
132                     lov->lov_tgts[req->rq_idx]->ltd_active))
133                 rc = 0;
134
135         /* FIXME in raid1 regime, should return 0 */
136         RETURN(rc);
137 }
138
139 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
140 {
141         cfs_list_add_tail(&req->rq_link, &set->set_list);
142         set->set_count++;
143         req->rq_rqset = set;
144 }
145
146 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
147                                struct lov_oinfo *loi, int flags,
148                                struct ost_lvb *lvb, __u32 mode, int rc);
149
150 static int lov_update_enqueue_lov(struct obd_export *exp,
151                                   struct lustre_handle *lov_lockhp,
152                                   struct lov_oinfo *loi, int flags, int idx,
153                                   __u64 oid, int rc)
154 {
155         struct lov_obd *lov = &exp->exp_obd->u.lov;
156
157         if (rc != ELDLM_OK &&
158             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
159                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
160                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
161                         /* -EUSERS used by OST to report file contention */
162                         if (rc != -EINTR && rc != -EUSERS)
163                                 CERROR("enqueue objid "LPX64" subobj "
164                                        LPX64" on OST idx %d: rc %d\n",
165                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
166                 } else
167                         rc = ELDLM_OK;
168         }
169         return rc;
170 }
171
172 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
173 {
174         struct lov_request_set *set = req->rq_rqset;
175         struct lustre_handle *lov_lockhp;
176         struct obd_info *oi = set->set_oi;
177         struct lov_oinfo *loi;
178         ENTRY;
179
180         LASSERT(oi != NULL);
181
182         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
183         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
184
185         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
186          * and that copy can be arbitrarily out of date.
187          *
188          * The LOV API is due for a serious rewriting anyways, and this
189          * can be addressed then. */
190
191         lov_stripe_lock(oi->oi_md);
192         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
193                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
194         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
195                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
196         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
197                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
198         lov_stripe_unlock(oi->oi_md);
199         lov_update_set(set, req, rc);
200         RETURN(rc);
201 }
202
203 /* The callback for osc_enqueue that updates lov info for every OSC request. */
204 static int cb_update_enqueue(void *cookie, int rc)
205 {
206         struct obd_info *oinfo = cookie;
207         struct ldlm_enqueue_info *einfo;
208         struct lov_request *lovreq;
209
210         lovreq = container_of(oinfo, struct lov_request, rq_oi);
211         einfo = lovreq->rq_rqset->set_ei;
212         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
213 }
214
215 static int enqueue_done(struct lov_request_set *set, __u32 mode)
216 {
217         struct lov_request *req;
218         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
219         int rc = 0;
220         ENTRY;
221
222         /* enqueue/match success, just return */
223         if (set->set_completes && set->set_completes == set->set_success)
224                 RETURN(0);
225
226         /* cancel enqueued/matched locks */
227         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
228                 struct lustre_handle *lov_lockhp;
229
230                 if (!req->rq_complete || req->rq_rc)
231                         continue;
232
233                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
234                 LASSERT(lov_lockhp);
235                 if (!lustre_handle_is_used(lov_lockhp))
236                         continue;
237
238                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
239                                 req->rq_oi.oi_md, mode, lov_lockhp);
240                 if (rc && lov->lov_tgts[req->rq_idx] &&
241                     lov->lov_tgts[req->rq_idx]->ltd_active)
242                         CERROR("cancelling obdjid "LPX64" on OST "
243                                "idx %d error: rc = %d\n",
244                                req->rq_oi.oi_md->lsm_object_id,
245                                req->rq_idx, rc);
246         }
247         if (set->set_lockh)
248                 lov_llh_put(set->set_lockh);
249         RETURN(rc);
250 }
251
252 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
253                          struct ptlrpc_request_set *rqset)
254 {
255         int ret = 0;
256         ENTRY;
257
258         if (set == NULL)
259                 RETURN(0);
260         LASSERT(set->set_exp);
261         /* Do enqueue_done only for sync requests and if any request
262          * succeeded. */
263         if (!rqset) {
264                 if (rc)
265                         set->set_completes = 0;
266                 ret = enqueue_done(set, mode);
267         } else if (set->set_lockh)
268                 lov_llh_put(set->set_lockh);
269
270         lov_put_reqset(set);
271
272         RETURN(rc ? rc : ret);
273 }
274
275 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
276                          struct ldlm_enqueue_info *einfo,
277                          struct lov_request_set **reqset)
278 {
279         struct lov_obd *lov = &exp->exp_obd->u.lov;
280         struct lov_request_set *set;
281         int i, rc = 0;
282         ENTRY;
283
284         OBD_ALLOC(set, sizeof(*set));
285         if (set == NULL)
286                 RETURN(-ENOMEM);
287         lov_init_set(set);
288
289         set->set_exp = exp;
290         set->set_oi = oinfo;
291         set->set_ei = einfo;
292         set->set_lockh = lov_llh_new(oinfo->oi_md);
293         if (set->set_lockh == NULL)
294                 GOTO(out_set, rc = -ENOMEM);
295         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
296
297         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
298                 struct lov_oinfo *loi;
299                 struct lov_request *req;
300                 obd_off start, end;
301
302                 loi = oinfo->oi_md->lsm_oinfo[i];
303                 if (!lov_stripe_intersects(oinfo->oi_md, i,
304                                            oinfo->oi_policy.l_extent.start,
305                                            oinfo->oi_policy.l_extent.end,
306                                            &start, &end))
307                         continue;
308
309                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
310                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
311                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
312                         continue;
313                 }
314
315                 OBD_ALLOC(req, sizeof(*req));
316                 if (req == NULL)
317                         GOTO(out_set, rc = -ENOMEM);
318
319                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
320                         sizeof(struct lov_oinfo *) +
321                         sizeof(struct lov_oinfo);
322                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
323                 if (req->rq_oi.oi_md == NULL) {
324                         OBD_FREE(req, sizeof(*req));
325                         GOTO(out_set, rc = -ENOMEM);
326                 }
327                 req->rq_oi.oi_md->lsm_oinfo[0] =
328                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
329                         sizeof(struct lov_oinfo *);
330
331                 /* Set lov request specific parameters. */
332                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
333                 req->rq_oi.oi_cb_up = cb_update_enqueue;
334                 req->rq_oi.oi_flags = oinfo->oi_flags;
335
336                 LASSERT(req->rq_oi.oi_lockh);
337
338                 req->rq_oi.oi_policy.l_extent.gid =
339                         oinfo->oi_policy.l_extent.gid;
340                 req->rq_oi.oi_policy.l_extent.start = start;
341                 req->rq_oi.oi_policy.l_extent.end = end;
342
343                 req->rq_idx = loi->loi_ost_idx;
344                 req->rq_stripe = i;
345
346                 /* XXX LOV STACKING: submd should be from the subobj */
347                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
348                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
349                 req->rq_oi.oi_md->lsm_stripe_count = 0;
350                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
351                         loi->loi_kms_valid;
352                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
353                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
354
355                 lov_set_add_req(req, set);
356         }
357         if (!set->set_count)
358                 GOTO(out_set, rc = -EIO);
359         *reqset = set;
360         RETURN(0);
361 out_set:
362         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
363         RETURN(rc);
364 }
365
366 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
367                          int rc)
368 {
369         int ret = rc;
370         ENTRY;
371
372         if (rc > 0)
373                 ret = 0;
374         else if (rc == 0)
375                 ret = 1;
376         lov_update_set(set, req, ret);
377         RETURN(rc);
378 }
379
380 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
381 {
382         int rc = 0;
383         ENTRY;
384
385         if (set == NULL)
386                 RETURN(0);
387         LASSERT(set->set_exp);
388         rc = enqueue_done(set, mode);
389         if ((set->set_count == set->set_success) &&
390             (flags & LDLM_FL_TEST_LOCK))
391                 lov_llh_put(set->set_lockh);
392
393         lov_put_reqset(set);
394
395         RETURN(rc);
396 }
397
398 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
399                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
400                        __u32 mode, struct lustre_handle *lockh,
401                        struct lov_request_set **reqset)
402 {
403         struct lov_obd *lov = &exp->exp_obd->u.lov;
404         struct lov_request_set *set;
405         int i, rc = 0;
406         ENTRY;
407
408         OBD_ALLOC(set, sizeof(*set));
409         if (set == NULL)
410                 RETURN(-ENOMEM);
411         lov_init_set(set);
412
413         set->set_exp = exp;
414         set->set_oi = oinfo;
415         set->set_oi->oi_md = lsm;
416         set->set_lockh = lov_llh_new(lsm);
417         if (set->set_lockh == NULL)
418                 GOTO(out_set, rc = -ENOMEM);
419         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
420
421         for (i = 0; i < lsm->lsm_stripe_count; i++){
422                 struct lov_oinfo *loi;
423                 struct lov_request *req;
424                 obd_off start, end;
425
426                 loi = lsm->lsm_oinfo[i];
427                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
428                                            policy->l_extent.end, &start, &end))
429                         continue;
430
431                 /* FIXME raid1 should grace this error */
432                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
433                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
434                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
435                         GOTO(out_set, rc = -EIO);
436                 }
437
438                 OBD_ALLOC(req, sizeof(*req));
439                 if (req == NULL)
440                         GOTO(out_set, rc = -ENOMEM);
441
442                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
443                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
444                 if (req->rq_oi.oi_md == NULL) {
445                         OBD_FREE(req, sizeof(*req));
446                         GOTO(out_set, rc = -ENOMEM);
447                 }
448
449                 req->rq_oi.oi_policy.l_extent.start = start;
450                 req->rq_oi.oi_policy.l_extent.end = end;
451                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
452
453                 req->rq_idx = loi->loi_ost_idx;
454                 req->rq_stripe = i;
455
456                 /* XXX LOV STACKING: submd should be from the subobj */
457                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
458                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
459                 req->rq_oi.oi_md->lsm_stripe_count = 0;
460
461                 lov_set_add_req(req, set);
462         }
463         if (!set->set_count)
464                 GOTO(out_set, rc = -EIO);
465         *reqset = set;
466         RETURN(rc);
467 out_set:
468         lov_fini_match_set(set, mode, 0);
469         RETURN(rc);
470 }
471
472 int lov_fini_cancel_set(struct lov_request_set *set)
473 {
474         int rc = 0;
475         ENTRY;
476
477         if (set == NULL)
478                 RETURN(0);
479
480         LASSERT(set->set_exp);
481         if (set->set_lockh)
482                 lov_llh_put(set->set_lockh);
483
484         lov_put_reqset(set);
485
486         RETURN(rc);
487 }
488
489 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
490                         struct lov_stripe_md *lsm, __u32 mode,
491                         struct lustre_handle *lockh,
492                         struct lov_request_set **reqset)
493 {
494         struct lov_request_set *set;
495         int i, rc = 0;
496         ENTRY;
497
498         OBD_ALLOC(set, sizeof(*set));
499         if (set == NULL)
500                 RETURN(-ENOMEM);
501         lov_init_set(set);
502
503         set->set_exp = exp;
504         set->set_oi = oinfo;
505         set->set_oi->oi_md = lsm;
506         set->set_lockh = lov_handle2llh(lockh);
507         if (set->set_lockh == NULL) {
508                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
509                 GOTO(out_set, rc = -EINVAL);
510         }
511         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
512
513         for (i = 0; i < lsm->lsm_stripe_count; i++){
514                 struct lov_request *req;
515                 struct lustre_handle *lov_lockhp;
516                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
517
518                 lov_lockhp = set->set_lockh->llh_handles + i;
519                 if (!lustre_handle_is_used(lov_lockhp)) {
520                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
521                                loi->loi_ost_idx, loi->loi_id);
522                         continue;
523                 }
524
525                 OBD_ALLOC(req, sizeof(*req));
526                 if (req == NULL)
527                         GOTO(out_set, rc = -ENOMEM);
528
529                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
530                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
531                 if (req->rq_oi.oi_md == NULL) {
532                         OBD_FREE(req, sizeof(*req));
533                         GOTO(out_set, rc = -ENOMEM);
534                 }
535
536                 req->rq_idx = loi->loi_ost_idx;
537                 req->rq_stripe = i;
538
539                 /* XXX LOV STACKING: submd should be from the subobj */
540                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
541                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
542                 req->rq_oi.oi_md->lsm_stripe_count = 0;
543
544                 lov_set_add_req(req, set);
545         }
546         if (!set->set_count)
547                 GOTO(out_set, rc = -EIO);
548         *reqset = set;
549         RETURN(rc);
550 out_set:
551         lov_fini_cancel_set(set);
552         RETURN(rc);
553 }
554
555 static int lov_update_create_set(struct lov_request_set *set,
556                                  struct lov_request *req, int rc)
557 {
558         struct obd_trans_info *oti = set->set_oti;
559         struct lov_stripe_md *lsm = set->set_oi->oi_md;
560         struct lov_oinfo *loi;
561         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
562         ENTRY;
563
564         if (rc && lov->lov_tgts[req->rq_idx] &&
565             lov->lov_tgts[req->rq_idx]->ltd_active) {
566                 CERROR("error creating fid "LPX64" sub-object"
567                        " on OST idx %d/%d: rc = %d\n",
568                        set->set_oi->oi_oa->o_id, req->rq_idx,
569                        lsm->lsm_stripe_count, rc);
570                 if (rc > 0) {
571                         CERROR("obd_create returned invalid err %d\n", rc);
572                         rc = -EIO;
573                 }
574         }
575
576         cfs_spin_lock(&set->set_lock);
577         req->rq_stripe = set->set_success;
578         loi = lsm->lsm_oinfo[req->rq_stripe];
579
580
581         if (rc) {
582                 lov_update_set(set, req, rc);
583                 cfs_spin_unlock(&set->set_lock);
584                 RETURN(rc);
585         }
586
587         loi->loi_id = req->rq_oi.oi_oa->o_id;
588         loi->loi_seq = req->rq_oi.oi_oa->o_seq;
589         loi->loi_ost_idx = req->rq_idx;
590         loi_init(loi);
591
592         if (oti && set->set_cookies)
593                 ++oti->oti_logcookies;
594         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
595                 set->set_cookie_sent++;
596
597         lov_update_set(set, req, rc);
598         cfs_spin_unlock(&set->set_lock);
599
600         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
601                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
602         RETURN(rc);
603 }
604
605 static int create_done(struct obd_export *exp, struct lov_request_set *set,
606                        struct lov_stripe_md **lsmp)
607 {
608         struct lov_obd *lov = &exp->exp_obd->u.lov;
609         struct obd_trans_info *oti = set->set_oti;
610         struct obdo *src_oa = set->set_oi->oi_oa;
611         struct lov_request *req;
612         struct obdo *ret_oa = NULL;
613         int attrset = 0, rc = 0;
614         ENTRY;
615
616         LASSERT(set->set_completes);
617
618         /* try alloc objects on other osts if osc_create fails for
619          * exceptions: RPC failure, ENOSPC, etc */
620         if (set->set_count != set->set_success) {
621                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
622                         if (req->rq_rc == 0)
623                                 continue;
624
625                         set->set_completes--;
626                         req->rq_complete = 0;
627
628                         rc = qos_remedy_create(set, req);
629                         lov_update_create_set(set, req, rc);
630                 }
631         }
632
633         /* no successful creates */
634         if (set->set_success == 0)
635                 GOTO(cleanup, rc);
636
637         if (set->set_count != set->set_success) {
638                 set->set_count = set->set_success;
639                 qos_shrink_lsm(set);
640         }
641
642         OBDO_ALLOC(ret_oa);
643         if (ret_oa == NULL)
644                 GOTO(cleanup, rc = -ENOMEM);
645
646         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
647                 if (!req->rq_complete || req->rq_rc)
648                         continue;
649                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
650                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
651                                 req->rq_stripe, &attrset);
652         }
653         if (src_oa->o_valid & OBD_MD_FLSIZE &&
654             ret_oa->o_size != src_oa->o_size) {
655                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
656                        src_oa->o_size, ret_oa->o_size);
657                 LBUG();
658         }
659         ret_oa->o_id = src_oa->o_id;
660         ret_oa->o_seq = src_oa->o_seq;
661         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
662         memcpy(src_oa, ret_oa, sizeof(*src_oa));
663         OBDO_FREE(ret_oa);
664
665         *lsmp = set->set_oi->oi_md;
666         GOTO(done, rc = 0);
667
668 cleanup:
669         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
670                 struct obd_export *sub_exp;
671                 int err = 0;
672
673                 if (!req->rq_complete || req->rq_rc)
674                         continue;
675
676                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
677                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
678                                   NULL);
679                 if (err)
680                         CERROR("Failed to uncreate objid "LPX64" subobj "
681                                LPX64" on OST idx %d: rc = %d\n",
682                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
683                                req->rq_idx, rc);
684         }
685         if (*lsmp == NULL)
686                 obd_free_memmd(exp, &set->set_oi->oi_md);
687 done:
688         if (oti && set->set_cookies) {
689                 oti->oti_logcookies = set->set_cookies;
690                 if (!set->set_cookie_sent) {
691                         oti_free_cookies(oti);
692                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
693                 } else {
694                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
695                 }
696         }
697         RETURN(rc);
698 }
699
700 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
701 {
702         int rc = 0;
703         ENTRY;
704
705         if (set == NULL)
706                 RETURN(0);
707         LASSERT(set->set_exp);
708         if (set->set_completes)
709                 rc = create_done(set->set_exp, set, lsmp);
710
711         lov_put_reqset(set);
712         RETURN(rc);
713 }
714
715 int cb_create_update(void *cookie, int rc)
716 {
717         struct obd_info *oinfo = cookie;
718         struct lov_request *lovreq;
719
720         lovreq = container_of(oinfo, struct lov_request, rq_oi);
721
722         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
723                 if (lovreq->rq_idx == cfs_fail_val)
724                         rc = -ENOTCONN;
725
726         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
727         if (lov_finished_set(lovreq->rq_rqset))
728                 lov_put_reqset(lovreq->rq_rqset);
729         return rc;
730 }
731
732 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
733                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
734                         struct obd_trans_info *oti,
735                         struct lov_request_set **reqset)
736 {
737         struct lov_request_set *set;
738         int rc = 0;
739         ENTRY;
740
741         OBD_ALLOC(set, sizeof(*set));
742         if (set == NULL)
743                 RETURN(-ENOMEM);
744         lov_init_set(set);
745
746         set->set_exp = exp;
747         set->set_oi = oinfo;
748         set->set_oi->oi_md = *lsmp;
749         set->set_oi->oi_oa = src_oa;
750         set->set_oti = oti;
751         lov_get_reqset(set);
752
753         rc = qos_prep_create(exp, set);
754         /* qos_shrink_lsm() may have allocated a new lsm */
755         *lsmp = oinfo->oi_md;
756         if (rc) {
757                 lov_fini_create_set(set, lsmp);
758                 lov_put_reqset(set);
759         } else {
760                 *reqset = set;
761         }
762         RETURN(rc);
763 }
764
765 static int common_attr_done(struct lov_request_set *set)
766 {
767         cfs_list_t *pos;
768         struct lov_request *req;
769         struct obdo *tmp_oa;
770         int rc = 0, attrset = 0;
771         ENTRY;
772
773         LASSERT(set->set_oi != NULL);
774
775         if (set->set_oi->oi_oa == NULL)
776                 RETURN(0);
777
778         if (!set->set_success)
779                 RETURN(-EIO);
780
781         OBDO_ALLOC(tmp_oa);
782         if (tmp_oa == NULL)
783                 GOTO(out, rc = -ENOMEM);
784
785         cfs_list_for_each (pos, &set->set_list) {
786                 req = cfs_list_entry(pos, struct lov_request, rq_link);
787
788                 if (!req->rq_complete || req->rq_rc)
789                         continue;
790                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
791                         continue;
792                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
793                                 req->rq_oi.oi_oa->o_valid,
794                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
795         }
796         if (!attrset) {
797                 CERROR("No stripes had valid attrs\n");
798                 rc = -EIO;
799         }
800         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
801             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
802                 /* When we take attributes of some epoch, we require all the
803                  * ost to be active. */
804                 CERROR("Not all the stripes had valid attrs\n");
805                 GOTO(out, rc = -EIO);
806         }
807
808         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
809         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
810 out:
811         if (tmp_oa)
812                 OBDO_FREE(tmp_oa);
813         RETURN(rc);
814
815 }
816
817 static int brw_done(struct lov_request_set *set)
818 {
819         struct lov_stripe_md *lsm = set->set_oi->oi_md;
820         struct lov_oinfo     *loi = NULL;
821         cfs_list_t *pos;
822         struct lov_request *req;
823         ENTRY;
824
825         cfs_list_for_each (pos, &set->set_list) {
826                 req = cfs_list_entry(pos, struct lov_request, rq_link);
827
828                 if (!req->rq_complete || req->rq_rc)
829                         continue;
830
831                 loi = lsm->lsm_oinfo[req->rq_stripe];
832
833                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
834                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
835         }
836
837         RETURN(0);
838 }
839
840 int lov_fini_brw_set(struct lov_request_set *set)
841 {
842         int rc = 0;
843         ENTRY;
844
845         if (set == NULL)
846                 RETURN(0);
847         LASSERT(set->set_exp);
848         if (set->set_completes) {
849                 rc = brw_done(set);
850                 /* FIXME update qos data here */
851         }
852         lov_put_reqset(set);
853
854         RETURN(rc);
855 }
856
857 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
858                      obd_count oa_bufs, struct brw_page *pga,
859                      struct obd_trans_info *oti,
860                      struct lov_request_set **reqset)
861 {
862         struct {
863                 obd_count       index;
864                 obd_count       count;
865                 obd_count       off;
866         } *info = NULL;
867         struct lov_request_set *set;
868         struct lov_obd *lov = &exp->exp_obd->u.lov;
869         int rc = 0, i, shift;
870         ENTRY;
871
872         OBD_ALLOC(set, sizeof(*set));
873         if (set == NULL)
874                 RETURN(-ENOMEM);
875         lov_init_set(set);
876
877         set->set_exp = exp;
878         set->set_oti = oti;
879         set->set_oi = oinfo;
880         set->set_oabufs = oa_bufs;
881         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
882         if (!set->set_pga)
883                 GOTO(out, rc = -ENOMEM);
884
885         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
886         if (!info)
887                 GOTO(out, rc = -ENOMEM);
888
889         /* calculate the page count for each stripe */
890         for (i = 0; i < oa_bufs; i++) {
891                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
892                 info[stripe].count++;
893         }
894
895         /* alloc and initialize lov request */
896         shift = 0;
897         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
898                 struct lov_oinfo *loi = NULL;
899                 struct lov_request *req;
900
901                 if (info[i].count == 0)
902                         continue;
903
904                 loi = oinfo->oi_md->lsm_oinfo[i];
905                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
906                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
907                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
908                         GOTO(out, rc = -EIO);
909                 }
910
911                 OBD_ALLOC(req, sizeof(*req));
912                 if (req == NULL)
913                         GOTO(out, rc = -ENOMEM);
914
915                 OBDO_ALLOC(req->rq_oi.oi_oa);
916                 if (req->rq_oi.oi_oa == NULL) {
917                         OBD_FREE(req, sizeof(*req));
918                         GOTO(out, rc = -ENOMEM);
919                 }
920
921                 if (oinfo->oi_oa) {
922                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
923                                sizeof(*req->rq_oi.oi_oa));
924                 }
925                 req->rq_oi.oi_oa->o_id = loi->loi_id;
926                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
927                 req->rq_oi.oi_oa->o_stripe_idx = i;
928
929                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
930                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
931                 if (req->rq_oi.oi_md == NULL) {
932                         OBDO_FREE(req->rq_oi.oi_oa);
933                         OBD_FREE(req, sizeof(*req));
934                         GOTO(out, rc = -ENOMEM);
935                 }
936
937                 req->rq_idx = loi->loi_ost_idx;
938                 req->rq_stripe = i;
939
940                 /* XXX LOV STACKING */
941                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
942                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
943                 req->rq_oabufs = info[i].count;
944                 req->rq_pgaidx = shift;
945                 shift += req->rq_oabufs;
946
947                 /* remember the index for sort brw_page array */
948                 info[i].index = req->rq_pgaidx;
949
950                 req->rq_oi.oi_capa = oinfo->oi_capa;
951
952                 lov_set_add_req(req, set);
953         }
954         if (!set->set_count)
955                 GOTO(out, rc = -EIO);
956
957         /* rotate & sort the brw_page array */
958         for (i = 0; i < oa_bufs; i++) {
959                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
960
961                 shift = info[stripe].index + info[stripe].off;
962                 LASSERT(shift < oa_bufs);
963                 set->set_pga[shift] = pga[i];
964                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
965                                   &set->set_pga[shift].off);
966                 info[stripe].off++;
967         }
968 out:
969         if (info)
970                 OBD_FREE_LARGE(info,
971                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
972
973         if (rc == 0)
974                 *reqset = set;
975         else
976                 lov_fini_brw_set(set);
977
978         RETURN(rc);
979 }
980
981 int lov_fini_getattr_set(struct lov_request_set *set)
982 {
983         int rc = 0;
984         ENTRY;
985
986         if (set == NULL)
987                 RETURN(0);
988         LASSERT(set->set_exp);
989         if (set->set_completes)
990                 rc = common_attr_done(set);
991
992         lov_put_reqset(set);
993
994         RETURN(rc);
995 }
996
997 /* The callback for osc_getattr_async that finilizes a request info when a
998  * response is received. */
999 static int cb_getattr_update(void *cookie, int rc)
1000 {
1001         struct obd_info *oinfo = cookie;
1002         struct lov_request *lovreq;
1003         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1004         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1005 }
1006
1007 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
1008                          struct lov_request_set **reqset)
1009 {
1010         struct lov_request_set *set;
1011         struct lov_obd *lov = &exp->exp_obd->u.lov;
1012         int rc = 0, i;
1013         ENTRY;
1014
1015         OBD_ALLOC(set, sizeof(*set));
1016         if (set == NULL)
1017                 RETURN(-ENOMEM);
1018         lov_init_set(set);
1019
1020         set->set_exp = exp;
1021         set->set_oi = oinfo;
1022
1023         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1024                 struct lov_oinfo *loi;
1025                 struct lov_request *req;
1026
1027                 loi = oinfo->oi_md->lsm_oinfo[i];
1028                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1029                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1030                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1031                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1032                                 /* SOM requires all the OSTs to be active. */
1033                                 GOTO(out_set, rc = -EIO);
1034                         continue;
1035                 }
1036
1037                 OBD_ALLOC(req, sizeof(*req));
1038                 if (req == NULL)
1039                         GOTO(out_set, rc = -ENOMEM);
1040
1041                 req->rq_stripe = i;
1042                 req->rq_idx = loi->loi_ost_idx;
1043
1044                 OBDO_ALLOC(req->rq_oi.oi_oa);
1045                 if (req->rq_oi.oi_oa == NULL) {
1046                         OBD_FREE(req, sizeof(*req));
1047                         GOTO(out_set, rc = -ENOMEM);
1048                 }
1049                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1050                        sizeof(*req->rq_oi.oi_oa));
1051                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1052                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1053                 req->rq_oi.oi_cb_up = cb_getattr_update;
1054                 req->rq_oi.oi_capa = oinfo->oi_capa;
1055
1056                 lov_set_add_req(req, set);
1057         }
1058         if (!set->set_count)
1059                 GOTO(out_set, rc = -EIO);
1060         *reqset = set;
1061         RETURN(rc);
1062 out_set:
1063         lov_fini_getattr_set(set);
1064         RETURN(rc);
1065 }
1066
1067 int lov_fini_destroy_set(struct lov_request_set *set)
1068 {
1069         ENTRY;
1070
1071         if (set == NULL)
1072                 RETURN(0);
1073         LASSERT(set->set_exp);
1074         if (set->set_completes) {
1075                 /* FIXME update qos data here */
1076         }
1077
1078         lov_put_reqset(set);
1079
1080         RETURN(0);
1081 }
1082
1083 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1084                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1085                          struct obd_trans_info *oti,
1086                          struct lov_request_set **reqset)
1087 {
1088         struct lov_request_set *set;
1089         struct lov_obd *lov = &exp->exp_obd->u.lov;
1090         int rc = 0, i;
1091         ENTRY;
1092
1093         OBD_ALLOC(set, sizeof(*set));
1094         if (set == NULL)
1095                 RETURN(-ENOMEM);
1096         lov_init_set(set);
1097
1098         set->set_exp = exp;
1099         set->set_oi = oinfo;
1100         set->set_oi->oi_md = lsm;
1101         set->set_oi->oi_oa = src_oa;
1102         set->set_oti = oti;
1103         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1104                 set->set_cookies = oti->oti_logcookies;
1105
1106         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1107                 struct lov_oinfo *loi;
1108                 struct lov_request *req;
1109
1110                 loi = lsm->lsm_oinfo[i];
1111                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1112                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1113                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1114                         continue;
1115                 }
1116
1117                 OBD_ALLOC(req, sizeof(*req));
1118                 if (req == NULL)
1119                         GOTO(out_set, rc = -ENOMEM);
1120
1121                 req->rq_stripe = i;
1122                 req->rq_idx = loi->loi_ost_idx;
1123
1124                 OBDO_ALLOC(req->rq_oi.oi_oa);
1125                 if (req->rq_oi.oi_oa == NULL) {
1126                         OBD_FREE(req, sizeof(*req));
1127                         GOTO(out_set, rc = -ENOMEM);
1128                 }
1129                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1130                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1131                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1132                 lov_set_add_req(req, set);
1133         }
1134         if (!set->set_count)
1135                 GOTO(out_set, rc = -EIO);
1136         *reqset = set;
1137         RETURN(rc);
1138 out_set:
1139         lov_fini_destroy_set(set);
1140         RETURN(rc);
1141 }
1142
1143 int lov_fini_setattr_set(struct lov_request_set *set)
1144 {
1145         int rc = 0;
1146         ENTRY;
1147
1148         if (set == NULL)
1149                 RETURN(0);
1150         LASSERT(set->set_exp);
1151         if (set->set_completes) {
1152                 rc = common_attr_done(set);
1153                 /* FIXME update qos data here */
1154         }
1155
1156         lov_put_reqset(set);
1157         RETURN(rc);
1158 }
1159
1160 int lov_update_setattr_set(struct lov_request_set *set,
1161                            struct lov_request *req, int rc)
1162 {
1163         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1164         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1165         ENTRY;
1166
1167         lov_update_set(set, req, rc);
1168
1169         /* grace error on inactive ost */
1170         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1171                     lov->lov_tgts[req->rq_idx]->ltd_active))
1172                 rc = 0;
1173
1174         if (rc == 0) {
1175                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1176                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1177                                 req->rq_oi.oi_oa->o_ctime;
1178                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1179                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1180                                 req->rq_oi.oi_oa->o_mtime;
1181                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1182                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1183                                 req->rq_oi.oi_oa->o_atime;
1184         }
1185
1186         RETURN(rc);
1187 }
1188
1189 /* The callback for osc_setattr_async that finilizes a request info when a
1190  * response is received. */
1191 static int cb_setattr_update(void *cookie, int rc)
1192 {
1193         struct obd_info *oinfo = cookie;
1194         struct lov_request *lovreq;
1195         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1196         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1197 }
1198
1199 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1200                          struct obd_trans_info *oti,
1201                          struct lov_request_set **reqset)
1202 {
1203         struct lov_request_set *set;
1204         struct lov_obd *lov = &exp->exp_obd->u.lov;
1205         int rc = 0, i;
1206         ENTRY;
1207
1208         OBD_ALLOC(set, sizeof(*set));
1209         if (set == NULL)
1210                 RETURN(-ENOMEM);
1211         lov_init_set(set);
1212
1213         set->set_exp = exp;
1214         set->set_oti = oti;
1215         set->set_oi = oinfo;
1216         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1217                 set->set_cookies = oti->oti_logcookies;
1218
1219         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1220                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1221                 struct lov_request *req;
1222
1223                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1224                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1225                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1226                         continue;
1227                 }
1228
1229                 OBD_ALLOC(req, sizeof(*req));
1230                 if (req == NULL)
1231                         GOTO(out_set, rc = -ENOMEM);
1232                 req->rq_stripe = i;
1233                 req->rq_idx = loi->loi_ost_idx;
1234
1235                 OBDO_ALLOC(req->rq_oi.oi_oa);
1236                 if (req->rq_oi.oi_oa == NULL) {
1237                         OBD_FREE(req, sizeof(*req));
1238                         GOTO(out_set, rc = -ENOMEM);
1239                 }
1240                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1241                        sizeof(*req->rq_oi.oi_oa));
1242                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1243                 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1244                 req->rq_oi.oi_oa->o_stripe_idx = i;
1245                 req->rq_oi.oi_cb_up = cb_setattr_update;
1246                 req->rq_oi.oi_capa = oinfo->oi_capa;
1247
1248                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1249                         int off = lov_stripe_offset(oinfo->oi_md,
1250                                                     oinfo->oi_oa->o_size, i,
1251                                                     &req->rq_oi.oi_oa->o_size);
1252
1253                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1254                                 req->rq_oi.oi_oa->o_size--;
1255
1256                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1257                                i, req->rq_oi.oi_oa->o_size,
1258                                oinfo->oi_oa->o_size);
1259                 }
1260                 lov_set_add_req(req, set);
1261         }
1262         if (!set->set_count)
1263                 GOTO(out_set, rc = -EIO);
1264         *reqset = set;
1265         RETURN(rc);
1266 out_set:
1267         lov_fini_setattr_set(set);
1268         RETURN(rc);
1269 }
1270
1271 int lov_fini_punch_set(struct lov_request_set *set)
1272 {
1273         int rc = 0;
1274         ENTRY;
1275
1276         if (set == NULL)
1277                 RETURN(0);
1278         LASSERT(set->set_exp);
1279         if (set->set_completes) {
1280                 rc = -EIO;
1281                 /* FIXME update qos data here */
1282                 if (set->set_success)
1283                         rc = common_attr_done(set);
1284         }
1285
1286         lov_put_reqset(set);
1287
1288         RETURN(rc);
1289 }
1290
1291 int lov_update_punch_set(struct lov_request_set *set,
1292                          struct lov_request *req, int rc)
1293 {
1294         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1295         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1296         ENTRY;
1297
1298         lov_update_set(set, req, rc);
1299
1300         /* grace error on inactive ost */
1301         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1302                 rc = 0;
1303
1304         if (rc == 0) {
1305                 lov_stripe_lock(lsm);
1306                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1307                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1308                                 req->rq_oi.oi_oa->o_blocks;
1309                 }
1310
1311                 /* Do we need to update lvb_size here? It needn't because
1312                  * it have been done in ll_truncate(). -jay */
1313                 lov_stripe_unlock(lsm);
1314         }
1315
1316         RETURN(rc);
1317 }
1318
1319 /* The callback for osc_punch that finilizes a request info when a response
1320  * is received. */
1321 static int cb_update_punch(void *cookie, int rc)
1322 {
1323         struct obd_info *oinfo = cookie;
1324         struct lov_request *lovreq;
1325         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1326         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1327 }
1328
1329 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1330                        struct obd_trans_info *oti,
1331                        struct lov_request_set **reqset)
1332 {
1333         struct lov_request_set *set;
1334         struct lov_obd *lov = &exp->exp_obd->u.lov;
1335         int rc = 0, i;
1336         ENTRY;
1337
1338         OBD_ALLOC(set, sizeof(*set));
1339         if (set == NULL)
1340                 RETURN(-ENOMEM);
1341         lov_init_set(set);
1342
1343         set->set_oi = oinfo;
1344         set->set_exp = exp;
1345
1346         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1347                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1348                 struct lov_request *req;
1349                 obd_off rs, re;
1350
1351                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1352                                            oinfo->oi_policy.l_extent.start,
1353                                            oinfo->oi_policy.l_extent.end,
1354                                            &rs, &re))
1355                         continue;
1356
1357                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1358                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1359                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1360                         GOTO(out_set, rc = -EIO);
1361                 }
1362
1363                 OBD_ALLOC(req, sizeof(*req));
1364                 if (req == NULL)
1365                         GOTO(out_set, rc = -ENOMEM);
1366                 req->rq_stripe = i;
1367                 req->rq_idx = loi->loi_ost_idx;
1368
1369                 OBDO_ALLOC(req->rq_oi.oi_oa);
1370                 if (req->rq_oi.oi_oa == NULL) {
1371                         OBD_FREE(req, sizeof(*req));
1372                         GOTO(out_set, rc = -ENOMEM);
1373                 }
1374                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1375                        sizeof(*req->rq_oi.oi_oa));
1376                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1377                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1378                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1379
1380                 req->rq_oi.oi_oa->o_stripe_idx = i;
1381                 req->rq_oi.oi_cb_up = cb_update_punch;
1382
1383                 req->rq_oi.oi_policy.l_extent.start = rs;
1384                 req->rq_oi.oi_policy.l_extent.end = re;
1385                 req->rq_oi.oi_policy.l_extent.gid = -1;
1386
1387                 req->rq_oi.oi_capa = oinfo->oi_capa;
1388
1389                 lov_set_add_req(req, set);
1390         }
1391         if (!set->set_count)
1392                 GOTO(out_set, rc = -EIO);
1393         *reqset = set;
1394         RETURN(rc);
1395 out_set:
1396         lov_fini_punch_set(set);
1397         RETURN(rc);
1398 }
1399
1400 int lov_fini_sync_set(struct lov_request_set *set)
1401 {
1402         int rc = 0;
1403         ENTRY;
1404
1405         if (set == NULL)
1406                 RETURN(0);
1407         LASSERT(set->set_exp);
1408         if (set->set_completes) {
1409                 if (!set->set_success)
1410                         rc = -EIO;
1411                 /* FIXME update qos data here */
1412         }
1413
1414         lov_put_reqset(set);
1415
1416         RETURN(rc);
1417 }
1418
1419 /* The callback for osc_sync that finilizes a request info when a
1420  * response is recieved. */
1421 static int cb_sync_update(void *cookie, int rc)
1422 {
1423         struct obd_info *oinfo = cookie;
1424         struct lov_request *lovreq;
1425
1426         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1427         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1428 }
1429
1430 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1431                       obd_off start, obd_off end,
1432                       struct lov_request_set **reqset)
1433 {
1434         struct lov_request_set *set;
1435         struct lov_obd *lov = &exp->exp_obd->u.lov;
1436         int rc = 0, i;
1437         ENTRY;
1438
1439         OBD_ALLOC_PTR(set);
1440         if (set == NULL)
1441                 RETURN(-ENOMEM);
1442         lov_init_set(set);
1443
1444         set->set_exp = exp;
1445         set->set_oi = oinfo;
1446
1447         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1448                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1449                 struct lov_request *req;
1450                 obd_off rs, re;
1451
1452                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1453                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1454                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1455                         continue;
1456                 }
1457
1458                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1459                                            &re))
1460                         continue;
1461
1462                 OBD_ALLOC_PTR(req);
1463                 if (req == NULL)
1464                         GOTO(out_set, rc = -ENOMEM);
1465                 req->rq_stripe = i;
1466                 req->rq_idx = loi->loi_ost_idx;
1467
1468                 OBDO_ALLOC(req->rq_oi.oi_oa);
1469                 if (req->rq_oi.oi_oa == NULL) {
1470                         OBD_FREE(req, sizeof(*req));
1471                         GOTO(out_set, rc = -ENOMEM);
1472                 }
1473                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1474                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1475                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1476                 req->rq_oi.oi_oa->o_stripe_idx = i;
1477
1478                 req->rq_oi.oi_policy.l_extent.start = rs;
1479                 req->rq_oi.oi_policy.l_extent.end = re;
1480                 req->rq_oi.oi_policy.l_extent.gid = -1;
1481                 req->rq_oi.oi_cb_up = cb_sync_update;
1482
1483                 lov_set_add_req(req, set);
1484         }
1485         if (!set->set_count)
1486                 GOTO(out_set, rc = -EIO);
1487         *reqset = set;
1488         RETURN(rc);
1489 out_set:
1490         lov_fini_sync_set(set);
1491         RETURN(rc);
1492 }
1493
1494 #define LOV_U64_MAX ((__u64)~0ULL)
1495 #define LOV_SUM_MAX(tot, add)                                           \
1496         do {                                                            \
1497                 if ((tot) + (add) < (tot))                              \
1498                         (tot) = LOV_U64_MAX;                            \
1499                 else                                                    \
1500                         (tot) += (add);                                 \
1501         } while(0)
1502
1503 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1504 {
1505         ENTRY;
1506
1507         if (success) {
1508                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1509
1510                 if (osfs->os_files != LOV_U64_MAX)
1511                         do_div(osfs->os_files, expected_stripes);
1512                 if (osfs->os_ffree != LOV_U64_MAX)
1513                         do_div(osfs->os_ffree, expected_stripes);
1514
1515                 cfs_spin_lock(&obd->obd_osfs_lock);
1516                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1517                 obd->obd_osfs_age = cfs_time_current_64();
1518                 cfs_spin_unlock(&obd->obd_osfs_lock);
1519                 RETURN(0);
1520         }
1521
1522         RETURN(-EIO);
1523 }
1524
1525 int lov_fini_statfs_set(struct lov_request_set *set)
1526 {
1527         int rc = 0;
1528         ENTRY;
1529
1530         if (set == NULL)
1531                 RETURN(0);
1532
1533         if (set->set_completes) {
1534                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1535                                      set->set_success);
1536         }
1537         lov_put_reqset(set);
1538         RETURN(rc);
1539 }
1540
1541 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1542                        int success)
1543 {
1544         int shift = 0, quit = 0;
1545         __u64 tmp;
1546
1547         if (success == 0) {
1548                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1549         } else {
1550                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1551                         /* assume all block sizes are always powers of 2 */
1552                         /* get the bits difference */
1553                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1554                         for (shift = 0; shift <= 64; ++shift) {
1555                                 if (tmp & 1) {
1556                                         if (quit)
1557                                                 break;
1558                                         else
1559                                                 quit = 1;
1560                                         shift = 0;
1561                                 }
1562                                 tmp >>= 1;
1563                         }
1564                 }
1565
1566                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1567                         osfs->os_bsize = lov_sfs->os_bsize;
1568
1569                         osfs->os_bfree  >>= shift;
1570                         osfs->os_bavail >>= shift;
1571                         osfs->os_blocks >>= shift;
1572                 } else if (shift != 0) {
1573                         lov_sfs->os_bfree  >>= shift;
1574                         lov_sfs->os_bavail >>= shift;
1575                         lov_sfs->os_blocks >>= shift;
1576                 }
1577 #ifdef MIN_DF
1578                 /* Sandia requested that df (and so, statfs) only
1579                    returned minimal available space on
1580                    a single OST, so people would be able to
1581                    write this much data guaranteed. */
1582                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1583                         /* Presumably if new bavail is smaller,
1584                            new bfree is bigger as well */
1585                         osfs->os_bfree = lov_sfs->os_bfree;
1586                         osfs->os_bavail = lov_sfs->os_bavail;
1587                 }
1588 #else
1589                 osfs->os_bfree += lov_sfs->os_bfree;
1590                 osfs->os_bavail += lov_sfs->os_bavail;
1591 #endif
1592                 osfs->os_blocks += lov_sfs->os_blocks;
1593                 /* XXX not sure about this one - depends on policy.
1594                  *   - could be minimum if we always stripe on all OBDs
1595                  *     (but that would be wrong for any other policy,
1596                  *     if one of the OBDs has no more objects left)
1597                  *   - could be sum if we stripe whole objects
1598                  *   - could be average, just to give a nice number
1599                  *
1600                  * To give a "reasonable" (if not wholly accurate)
1601                  * number, we divide the total number of free objects
1602                  * by expected stripe count (watch out for overflow).
1603                  */
1604                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1605                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1606         }
1607 }
1608
1609 /* The callback for osc_statfs_async that finilizes a request info when a
1610  * response is received. */
1611 static int cb_statfs_update(void *cookie, int rc)
1612 {
1613         struct obd_info *oinfo = cookie;
1614         struct lov_request *lovreq;
1615         struct obd_statfs *osfs, *lov_sfs;
1616         struct lov_obd *lov;
1617         struct lov_tgt_desc *tgt;
1618         struct obd_device *lovobd, *tgtobd;
1619         int success;
1620         ENTRY;
1621
1622         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1623         lovobd = lovreq->rq_rqset->set_obd;
1624         lov = &lovobd->u.lov;
1625         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1626         lov_sfs = oinfo->oi_osfs;
1627         success = lovreq->rq_rqset->set_success;
1628         /* XXX: the same is done in lov_update_common_set, however
1629            lovset->set_exp is not initialized. */
1630         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1631         if (rc)
1632                 GOTO(out, rc);
1633
1634         obd_getref(lovobd);
1635         tgt = lov->lov_tgts[lovreq->rq_idx];
1636         if (!tgt || !tgt->ltd_active)
1637                 GOTO(out_update, rc);
1638
1639         tgtobd = class_exp2obd(tgt->ltd_exp);
1640         cfs_spin_lock(&tgtobd->obd_osfs_lock);
1641         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1642         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1643                 tgtobd->obd_osfs_age = cfs_time_current_64();
1644         cfs_spin_unlock(&tgtobd->obd_osfs_lock);
1645
1646 out_update:
1647         lov_update_statfs(osfs, lov_sfs, success);
1648         qos_update(lov);
1649         obd_putref(lovobd);
1650
1651 out:
1652         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1653             lov_finished_set(lovreq->rq_rqset)) {
1654                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1655                                     lovreq->rq_rqset->set_success !=
1656                                                   lovreq->rq_rqset->set_count);
1657                qos_statfs_done(lov);
1658         }
1659
1660         RETURN(0);
1661 }
1662
1663 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1664                         struct lov_request_set **reqset)
1665 {
1666         struct lov_request_set *set;
1667         struct lov_obd *lov = &obd->u.lov;
1668         int rc = 0, i;
1669         ENTRY;
1670
1671         OBD_ALLOC(set, sizeof(*set));
1672         if (set == NULL)
1673                 RETURN(-ENOMEM);
1674         lov_init_set(set);
1675
1676         set->set_obd = obd;
1677         set->set_oi = oinfo;
1678
1679         /* We only get block data from the OBD */
1680         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1681                 struct lov_request *req;
1682
1683                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1684                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1685                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1686                         continue;
1687                 }
1688
1689                 /* skip targets that have been explicitely disabled by the
1690                  * administrator */
1691                 if (!lov->lov_tgts[i]->ltd_exp) {
1692                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1693                         continue;
1694                 }
1695
1696                 OBD_ALLOC(req, sizeof(*req));
1697                 if (req == NULL)
1698                         GOTO(out_set, rc = -ENOMEM);
1699
1700                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1701                 if (req->rq_oi.oi_osfs == NULL) {
1702                         OBD_FREE(req, sizeof(*req));
1703                         GOTO(out_set, rc = -ENOMEM);
1704                 }
1705
1706                 req->rq_idx = i;
1707                 req->rq_oi.oi_cb_up = cb_statfs_update;
1708                 req->rq_oi.oi_flags = oinfo->oi_flags;
1709
1710                 lov_set_add_req(req, set);
1711         }
1712         if (!set->set_count)
1713                 GOTO(out_set, rc = -EIO);
1714         *reqset = set;
1715         RETURN(rc);
1716 out_set:
1717         lov_fini_statfs_set(set);
1718         RETURN(rc);
1719 }