Whamcloud - gitweb
b=20607 Use refcount on lov_request_set to prevent memory corruption.
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63         spin_lock_init(&set->set_lock);
64 }
65
66 void lov_finish_set(struct lov_request_set *set)
67 {
68         struct list_head *pos, *n;
69         ENTRY;
70
71         LASSERT(set);
72         list_for_each_safe(pos, n, &set->set_list) {
73                 struct lov_request *req = list_entry(pos, struct lov_request,
74                                                      rq_link);
75                 list_del_init(&req->rq_link);
76
77                 if (req->rq_oi.oi_oa)
78                         OBDO_FREE(req->rq_oi.oi_oa);
79                 if (req->rq_oi.oi_md)
80                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
81                 if (req->rq_oi.oi_osfs)
82                         OBD_FREE(req->rq_oi.oi_osfs,
83                                  sizeof(*req->rq_oi.oi_osfs));
84                 OBD_FREE(req, sizeof(*req));
85         }
86
87         if (set->set_pga) {
88                 int len = set->set_oabufs * sizeof(*set->set_pga);
89                 OBD_FREE(set->set_pga, len);
90         }
91         if (set->set_lockh)
92                 lov_llh_put(set->set_lockh);
93
94         OBD_FREE(set, sizeof(*set));
95         EXIT;
96 }
97
98 int lov_finished_set(struct lov_request_set *set)
99 {
100         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
101                set->set_count);
102         return set->set_completes == set->set_count;
103 }
104
105
106 void lov_update_set(struct lov_request_set *set,
107                     struct lov_request *req, int rc)
108 {
109         req->rq_complete = 1;
110         req->rq_rc = rc;
111
112         set->set_completes++;
113         if (rc == 0)
114                 set->set_success++;
115
116         cfs_waitq_signal(&set->set_waitq);
117 }
118
119 int lov_update_common_set(struct lov_request_set *set,
120                           struct lov_request *req, int rc)
121 {
122         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
123         ENTRY;
124
125         lov_update_set(set, req, rc);
126
127         /* grace error on inactive ost */
128         if (rc && !(lov->lov_tgts[req->rq_idx] &&
129                     lov->lov_tgts[req->rq_idx]->ltd_active))
130                 rc = 0;
131
132         /* FIXME in raid1 regime, should return 0 */
133         RETURN(rc);
134 }
135
136 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
137 {
138         list_add_tail(&req->rq_link, &set->set_list);
139         set->set_count++;
140         req->rq_rqset = set;
141 }
142
143 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
144                                struct lov_oinfo *loi, int flags,
145                                struct ost_lvb *lvb, __u32 mode, int rc);
146
147 static int lov_update_enqueue_lov(struct obd_export *exp,
148                                   struct lustre_handle *lov_lockhp,
149                                   struct lov_oinfo *loi, int flags, int idx,
150                                   __u64 oid, int rc)
151 {
152         struct lov_obd *lov = &exp->exp_obd->u.lov;
153
154         if (rc != ELDLM_OK &&
155             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
156                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
157                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
158                         /* -EUSERS used by OST to report file contention */
159                         if (rc != -EINTR && rc != -EUSERS)
160                                 CERROR("enqueue objid "LPX64" subobj "
161                                        LPX64" on OST idx %d: rc %d\n",
162                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
163                 } else
164                         rc = ELDLM_OK;
165         }
166         return rc;
167 }
168
169 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
170 {
171         struct lov_request_set *set = req->rq_rqset;
172         struct lustre_handle *lov_lockhp;
173         struct obd_info *oi = set->set_oi;
174         struct lov_oinfo *loi;
175         ENTRY;
176
177         LASSERT(oi != NULL);
178
179         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
180         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
181
182         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
183          * and that copy can be arbitrarily out of date.
184          *
185          * The LOV API is due for a serious rewriting anyways, and this
186          * can be addressed then. */
187
188         lov_stripe_lock(oi->oi_md);
189         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
190                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
191         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
192                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
193         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
194                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
195         lov_stripe_unlock(oi->oi_md);
196         lov_update_set(set, req, rc);
197         RETURN(rc);
198 }
199
200 /* The callback for osc_enqueue that updates lov info for every OSC request. */
201 static int cb_update_enqueue(void *cookie, int rc)
202 {
203         struct obd_info *oinfo = cookie;
204         struct ldlm_enqueue_info *einfo;
205         struct lov_request *lovreq;
206
207         lovreq = container_of(oinfo, struct lov_request, rq_oi);
208         einfo = lovreq->rq_rqset->set_ei;
209         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
210 }
211
212 static int enqueue_done(struct lov_request_set *set, __u32 mode)
213 {
214         struct lov_request *req;
215         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
216         int rc = 0;
217         ENTRY;
218
219         /* enqueue/match success, just return */
220         if (set->set_completes && set->set_completes == set->set_success)
221                 RETURN(0);
222
223         /* cancel enqueued/matched locks */
224         list_for_each_entry(req, &set->set_list, rq_link) {
225                 struct lustre_handle *lov_lockhp;
226
227                 if (!req->rq_complete || req->rq_rc)
228                         continue;
229
230                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
231                 LASSERT(lov_lockhp);
232                 if (!lustre_handle_is_used(lov_lockhp))
233                         continue;
234
235                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
236                                 req->rq_oi.oi_md, mode, lov_lockhp);
237                 if (rc && lov->lov_tgts[req->rq_idx] &&
238                     lov->lov_tgts[req->rq_idx]->ltd_active)
239                         CERROR("cancelling obdjid "LPX64" on OST "
240                                "idx %d error: rc = %d\n",
241                                req->rq_oi.oi_md->lsm_object_id,
242                                req->rq_idx, rc);
243         }
244         if (set->set_lockh)
245                 lov_llh_put(set->set_lockh);
246         RETURN(rc);
247 }
248
249 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
250                          struct ptlrpc_request_set *rqset)
251 {
252         int ret = 0;
253         ENTRY;
254
255         if (set == NULL)
256                 RETURN(0);
257         LASSERT(set->set_exp);
258         /* Do enqueue_done only for sync requests and if any request
259          * succeeded. */
260         if (!rqset) {
261                 if (rc)
262                         set->set_completes = 0;
263                 ret = enqueue_done(set, mode);
264         } else if (set->set_lockh)
265                 lov_llh_put(set->set_lockh);
266
267         lov_put_reqset(set);
268
269         RETURN(rc ? rc : ret);
270 }
271
272 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
273                          struct ldlm_enqueue_info *einfo,
274                          struct lov_request_set **reqset)
275 {
276         struct lov_obd *lov = &exp->exp_obd->u.lov;
277         struct lov_request_set *set;
278         int i, rc = 0;
279         ENTRY;
280
281         OBD_ALLOC(set, sizeof(*set));
282         if (set == NULL)
283                 RETURN(-ENOMEM);
284         lov_init_set(set);
285
286         set->set_exp = exp;
287         set->set_oi = oinfo;
288         set->set_ei = einfo;
289         set->set_lockh = lov_llh_new(oinfo->oi_md);
290         if (set->set_lockh == NULL)
291                 GOTO(out_set, rc = -ENOMEM);
292         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
293
294         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
295                 struct lov_oinfo *loi;
296                 struct lov_request *req;
297                 obd_off start, end;
298
299                 loi = oinfo->oi_md->lsm_oinfo[i];
300                 if (!lov_stripe_intersects(oinfo->oi_md, i,
301                                            oinfo->oi_policy.l_extent.start,
302                                            oinfo->oi_policy.l_extent.end,
303                                            &start, &end))
304                         continue;
305
306                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
307                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
308                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
309                         continue;
310                 }
311
312                 OBD_ALLOC(req, sizeof(*req));
313                 if (req == NULL)
314                         GOTO(out_set, rc = -ENOMEM);
315
316                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
317                         sizeof(struct lov_oinfo *) +
318                         sizeof(struct lov_oinfo);
319                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
320                 if (req->rq_oi.oi_md == NULL) {
321                         OBD_FREE(req, sizeof(*req));
322                         GOTO(out_set, rc = -ENOMEM);
323                 }
324                 req->rq_oi.oi_md->lsm_oinfo[0] =
325                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
326                         sizeof(struct lov_oinfo *);
327
328                 /* Set lov request specific parameters. */
329                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
330                 req->rq_oi.oi_cb_up = cb_update_enqueue;
331                 req->rq_oi.oi_flags = oinfo->oi_flags;
332
333                 LASSERT(req->rq_oi.oi_lockh);
334
335                 req->rq_oi.oi_policy.l_extent.gid =
336                         oinfo->oi_policy.l_extent.gid;
337                 req->rq_oi.oi_policy.l_extent.start = start;
338                 req->rq_oi.oi_policy.l_extent.end = end;
339
340                 req->rq_idx = loi->loi_ost_idx;
341                 req->rq_stripe = i;
342
343                 /* XXX LOV STACKING: submd should be from the subobj */
344                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
345                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
346                 req->rq_oi.oi_md->lsm_stripe_count = 0;
347                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
348                         loi->loi_kms_valid;
349                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
350                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
351
352                 lov_set_add_req(req, set);
353         }
354         if (!set->set_count)
355                 GOTO(out_set, rc = -EIO);
356         *reqset = set;
357         RETURN(0);
358 out_set:
359         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
360         RETURN(rc);
361 }
362
363 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
364                          int rc)
365 {
366         int ret = rc;
367         ENTRY;
368
369         if (rc > 0)
370                 ret = 0;
371         else if (rc == 0)
372                 ret = 1;
373         lov_update_set(set, req, ret);
374         RETURN(rc);
375 }
376
377 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
378 {
379         int rc = 0;
380         ENTRY;
381
382         if (set == NULL)
383                 RETURN(0);
384         LASSERT(set->set_exp);
385         rc = enqueue_done(set, mode);
386         if ((set->set_count == set->set_success) &&
387             (flags & LDLM_FL_TEST_LOCK))
388                 lov_llh_put(set->set_lockh);
389
390         lov_put_reqset(set);
391
392         RETURN(rc);
393 }
394
395 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
396                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
397                        __u32 mode, struct lustre_handle *lockh,
398                        struct lov_request_set **reqset)
399 {
400         struct lov_obd *lov = &exp->exp_obd->u.lov;
401         struct lov_request_set *set;
402         int i, rc = 0;
403         ENTRY;
404
405         OBD_ALLOC(set, sizeof(*set));
406         if (set == NULL)
407                 RETURN(-ENOMEM);
408         lov_init_set(set);
409
410         set->set_exp = exp;
411         set->set_oi = oinfo;
412         set->set_oi->oi_md = lsm;
413         set->set_lockh = lov_llh_new(lsm);
414         if (set->set_lockh == NULL)
415                 GOTO(out_set, rc = -ENOMEM);
416         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
417
418         for (i = 0; i < lsm->lsm_stripe_count; i++){
419                 struct lov_oinfo *loi;
420                 struct lov_request *req;
421                 obd_off start, end;
422
423                 loi = lsm->lsm_oinfo[i];
424                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
425                                            policy->l_extent.end, &start, &end))
426                         continue;
427
428                 /* FIXME raid1 should grace this error */
429                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
430                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
431                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
432                         GOTO(out_set, rc = -EIO);
433                 }
434
435                 OBD_ALLOC(req, sizeof(*req));
436                 if (req == NULL)
437                         GOTO(out_set, rc = -ENOMEM);
438
439                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
440                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
441                 if (req->rq_oi.oi_md == NULL) {
442                         OBD_FREE(req, sizeof(*req));
443                         GOTO(out_set, rc = -ENOMEM);
444                 }
445
446                 req->rq_oi.oi_policy.l_extent.start = start;
447                 req->rq_oi.oi_policy.l_extent.end = end;
448                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
449
450                 req->rq_idx = loi->loi_ost_idx;
451                 req->rq_stripe = i;
452
453                 /* XXX LOV STACKING: submd should be from the subobj */
454                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
455                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
456                 req->rq_oi.oi_md->lsm_stripe_count = 0;
457
458                 lov_set_add_req(req, set);
459         }
460         if (!set->set_count)
461                 GOTO(out_set, rc = -EIO);
462         *reqset = set;
463         RETURN(rc);
464 out_set:
465         lov_fini_match_set(set, mode, 0);
466         RETURN(rc);
467 }
468
469 int lov_fini_cancel_set(struct lov_request_set *set)
470 {
471         int rc = 0;
472         ENTRY;
473
474         if (set == NULL)
475                 RETURN(0);
476
477         LASSERT(set->set_exp);
478         if (set->set_lockh)
479                 lov_llh_put(set->set_lockh);
480
481         lov_put_reqset(set);
482
483         RETURN(rc);
484 }
485
486 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
487                         struct lov_stripe_md *lsm, __u32 mode,
488                         struct lustre_handle *lockh,
489                         struct lov_request_set **reqset)
490 {
491         struct lov_request_set *set;
492         int i, rc = 0;
493         ENTRY;
494
495         OBD_ALLOC(set, sizeof(*set));
496         if (set == NULL)
497                 RETURN(-ENOMEM);
498         lov_init_set(set);
499
500         set->set_exp = exp;
501         set->set_oi = oinfo;
502         set->set_oi->oi_md = lsm;
503         set->set_lockh = lov_handle2llh(lockh);
504         if (set->set_lockh == NULL) {
505                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
506                 GOTO(out_set, rc = -EINVAL);
507         }
508         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
509
510         for (i = 0; i < lsm->lsm_stripe_count; i++){
511                 struct lov_request *req;
512                 struct lustre_handle *lov_lockhp;
513                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
514
515                 lov_lockhp = set->set_lockh->llh_handles + i;
516                 if (!lustre_handle_is_used(lov_lockhp)) {
517                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
518                                loi->loi_ost_idx, loi->loi_id);
519                         continue;
520                 }
521
522                 OBD_ALLOC(req, sizeof(*req));
523                 if (req == NULL)
524                         GOTO(out_set, rc = -ENOMEM);
525
526                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
527                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
528                 if (req->rq_oi.oi_md == NULL) {
529                         OBD_FREE(req, sizeof(*req));
530                         GOTO(out_set, rc = -ENOMEM);
531                 }
532
533                 req->rq_idx = loi->loi_ost_idx;
534                 req->rq_stripe = i;
535
536                 /* XXX LOV STACKING: submd should be from the subobj */
537                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
538                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
539                 req->rq_oi.oi_md->lsm_stripe_count = 0;
540
541                 lov_set_add_req(req, set);
542         }
543         if (!set->set_count)
544                 GOTO(out_set, rc = -EIO);
545         *reqset = set;
546         RETURN(rc);
547 out_set:
548         lov_fini_cancel_set(set);
549         RETURN(rc);
550 }
551
552 static int create_done(struct obd_export *exp, struct lov_request_set *set,
553                        struct lov_stripe_md **lsmp)
554 {
555         struct lov_obd *lov = &exp->exp_obd->u.lov;
556         struct obd_trans_info *oti = set->set_oti;
557         struct obdo *src_oa = set->set_oi->oi_oa;
558         struct lov_request *req;
559         struct obdo *ret_oa = NULL;
560         int attrset = 0, rc = 0;
561         ENTRY;
562
563         LASSERT(set->set_completes);
564
565         /* try alloc objects on other osts if osc_create fails for
566          * exceptions: RPC failure, ENOSPC, etc */
567         if (set->set_count != set->set_success) {
568                 list_for_each_entry (req, &set->set_list, rq_link) {
569                         if (req->rq_rc == 0)
570                                 continue;
571
572                         set->set_completes--;
573                         req->rq_complete = 0;
574
575                         rc = qos_remedy_create(set, req);
576                         lov_update_create_set(set, req, rc);
577                 }
578         }
579
580         /* no successful creates */
581         if (set->set_success == 0)
582                 GOTO(cleanup, rc);
583
584         if (set->set_count != set->set_success) {
585                 set->set_count = set->set_success;
586                 qos_shrink_lsm(set);
587         }
588
589         OBDO_ALLOC(ret_oa);
590         if (ret_oa == NULL)
591                 GOTO(cleanup, rc = -ENOMEM);
592
593         list_for_each_entry(req, &set->set_list, rq_link) {
594                 if (!req->rq_complete || req->rq_rc)
595                         continue;
596                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
597                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
598                                 req->rq_stripe, &attrset);
599         }
600         if (src_oa->o_valid & OBD_MD_FLSIZE &&
601             ret_oa->o_size != src_oa->o_size) {
602                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
603                        src_oa->o_size, ret_oa->o_size);
604                 LBUG();
605         }
606         ret_oa->o_id = src_oa->o_id;
607         ret_oa->o_gr = src_oa->o_gr;
608         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
609         memcpy(src_oa, ret_oa, sizeof(*src_oa));
610         OBDO_FREE(ret_oa);
611
612         *lsmp = set->set_oi->oi_md;
613         GOTO(done, rc = 0);
614
615 cleanup:
616         list_for_each_entry(req, &set->set_list, rq_link) {
617                 struct obd_export *sub_exp;
618                 int err = 0;
619
620                 if (!req->rq_complete || req->rq_rc)
621                         continue;
622
623                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
624                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
625                                   NULL);
626                 if (err)
627                         CERROR("Failed to uncreate objid "LPX64" subobj "
628                                LPX64" on OST idx %d: rc = %d\n",
629                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
630                                req->rq_idx, rc);
631         }
632         if (*lsmp == NULL)
633                 obd_free_memmd(exp, &set->set_oi->oi_md);
634 done:
635         if (oti && set->set_cookies) {
636                 oti->oti_logcookies = set->set_cookies;
637                 if (!set->set_cookie_sent) {
638                         oti_free_cookies(oti);
639                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
640                 } else {
641                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
642                 }
643         }
644         RETURN(rc);
645 }
646
647 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
648 {
649         int rc = 0;
650         ENTRY;
651
652         if (set == NULL)
653                 RETURN(0);
654         LASSERT(set->set_exp);
655         if (set->set_completes)
656                 rc = create_done(set->set_exp, set, lsmp);
657
658         lov_put_reqset(set);
659         RETURN(rc);
660 }
661
662 int lov_update_create_set(struct lov_request_set *set,
663                           struct lov_request *req, int rc)
664 {
665         struct obd_trans_info *oti = set->set_oti;
666         struct lov_stripe_md *lsm = set->set_oi->oi_md;
667         struct lov_oinfo *loi;
668         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
669         ENTRY;
670
671         if (rc && lov->lov_tgts[req->rq_idx] &&
672             lov->lov_tgts[req->rq_idx]->ltd_active) {
673                 CERROR("error creating fid "LPX64" sub-object"
674                        " on OST idx %d/%d: rc = %d\n",
675                        set->set_oi->oi_oa->o_id, req->rq_idx,
676                        lsm->lsm_stripe_count, rc);
677                 if (rc > 0) {
678                         CERROR("obd_create returned invalid err %d\n", rc);
679                         rc = -EIO;
680                 }
681         }
682
683         spin_lock(&set->set_lock);
684         req->rq_stripe = set->set_success;
685         loi = lsm->lsm_oinfo[req->rq_stripe];
686         if (rc) {
687                 lov_update_set(set, req, rc);
688                 spin_unlock(&set->set_lock);
689                 RETURN(rc);
690         }
691
692         loi->loi_id = req->rq_oi.oi_oa->o_id;
693         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
694         loi->loi_ost_idx = req->rq_idx;
695         loi_init(loi);
696
697         if (oti && set->set_cookies)
698                 ++oti->oti_logcookies;
699         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
700                 set->set_cookie_sent++;
701
702         lov_update_set(set, req, rc);
703         spin_unlock(&set->set_lock);
704
705         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
706                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
707         RETURN(rc);
708 }
709
710 int cb_create_update(void *cookie, int rc)
711 {
712         struct obd_info *oinfo = cookie;
713         struct lov_request *lovreq;
714
715         lovreq = container_of(oinfo, struct lov_request, rq_oi);
716         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
717         if (lov_finished_set(lovreq->rq_rqset))
718                 lov_put_reqset(lovreq->rq_rqset);
719         return rc;
720 }
721
722
723 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
724                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
725                         struct obd_trans_info *oti,
726                         struct lov_request_set **reqset)
727 {
728         struct lov_request_set *set;
729         int rc = 0;
730         ENTRY;
731
732         OBD_ALLOC(set, sizeof(*set));
733         if (set == NULL)
734                 RETURN(-ENOMEM);
735         lov_init_set(set);
736
737         set->set_exp = exp;
738         set->set_oi = oinfo;
739         set->set_oi->oi_md = *lsmp;
740         set->set_oi->oi_oa = src_oa;
741         set->set_oti = oti;
742         lov_get_reqset(set);
743
744         rc = qos_prep_create(exp, set);
745         /* qos_shrink_lsm() may have allocated a new lsm */
746         *lsmp = oinfo->oi_md;
747         if (rc) {
748                 lov_fini_create_set(set, lsmp);
749                 lov_put_reqset(set);
750         } else {
751                 *reqset = set;
752         }
753         RETURN(rc);
754 }
755
756 static int common_attr_done(struct lov_request_set *set)
757 {
758         struct list_head *pos;
759         struct lov_request *req;
760         struct obdo *tmp_oa;
761         int rc = 0, attrset = 0;
762         ENTRY;
763
764         LASSERT(set->set_oi != NULL);
765
766         if (set->set_oi->oi_oa == NULL)
767                 RETURN(0);
768
769         if (!set->set_success)
770                 RETURN(-EIO);
771
772         OBDO_ALLOC(tmp_oa);
773         if (tmp_oa == NULL)
774                 GOTO(out, rc = -ENOMEM);
775
776         list_for_each (pos, &set->set_list) {
777                 req = list_entry(pos, struct lov_request, rq_link);
778
779                 if (!req->rq_complete || req->rq_rc)
780                         continue;
781                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
782                         continue;
783                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
784                                 req->rq_oi.oi_oa->o_valid,
785                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
786         }
787         if (!attrset) {
788                 CERROR("No stripes had valid attrs\n");
789                 rc = -EIO;
790         }
791         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
792         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
793 out:
794         if (tmp_oa)
795                 OBDO_FREE(tmp_oa);
796         RETURN(rc);
797
798 }
799
800 static int brw_done(struct lov_request_set *set)
801 {
802         struct lov_stripe_md *lsm = set->set_oi->oi_md;
803         struct lov_oinfo     *loi = NULL;
804         struct list_head *pos;
805         struct lov_request *req;
806         ENTRY;
807
808         list_for_each (pos, &set->set_list) {
809                 req = list_entry(pos, struct lov_request, rq_link);
810
811                 if (!req->rq_complete || req->rq_rc)
812                         continue;
813
814                 loi = lsm->lsm_oinfo[req->rq_stripe];
815
816                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
817                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
818         }
819
820         RETURN(0);
821 }
822
823 int lov_fini_brw_set(struct lov_request_set *set)
824 {
825         int rc = 0;
826         ENTRY;
827
828         if (set == NULL)
829                 RETURN(0);
830         LASSERT(set->set_exp);
831         if (set->set_completes) {
832                 rc = brw_done(set);
833                 /* FIXME update qos data here */
834         }
835         lov_put_reqset(set);
836
837         RETURN(rc);
838 }
839
840 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
841                      obd_count oa_bufs, struct brw_page *pga,
842                      struct obd_trans_info *oti,
843                      struct lov_request_set **reqset)
844 {
845         struct {
846                 obd_count       index;
847                 obd_count       count;
848                 obd_count       off;
849         } *info = NULL;
850         struct lov_request_set *set;
851         struct lov_obd *lov = &exp->exp_obd->u.lov;
852         int rc = 0, i, shift;
853         ENTRY;
854
855         OBD_ALLOC(set, sizeof(*set));
856         if (set == NULL)
857                 RETURN(-ENOMEM);
858         lov_init_set(set);
859
860         set->set_exp = exp;
861         set->set_oti = oti;
862         set->set_oi = oinfo;
863         set->set_oabufs = oa_bufs;
864         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
865         if (!set->set_pga)
866                 GOTO(out, rc = -ENOMEM);
867
868         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
869         if (!info)
870                 GOTO(out, rc = -ENOMEM);
871
872         /* calculate the page count for each stripe */
873         for (i = 0; i < oa_bufs; i++) {
874                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
875                 info[stripe].count++;
876         }
877
878         /* alloc and initialize lov request */
879         shift = 0;
880         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
881                 struct lov_oinfo *loi = NULL;
882                 struct lov_request *req;
883
884                 if (info[i].count == 0)
885                         continue;
886
887                 loi = oinfo->oi_md->lsm_oinfo[i];
888                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
889                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
890                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
891                         GOTO(out, rc = -EIO);
892                 }
893
894                 OBD_ALLOC(req, sizeof(*req));
895                 if (req == NULL)
896                         GOTO(out, rc = -ENOMEM);
897
898                 OBDO_ALLOC(req->rq_oi.oi_oa);
899                 if (req->rq_oi.oi_oa == NULL) {
900                         OBD_FREE(req, sizeof(*req));
901                         GOTO(out, rc = -ENOMEM);
902                 }
903
904                 if (oinfo->oi_oa) {
905                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
906                                sizeof(*req->rq_oi.oi_oa));
907                 }
908                 req->rq_oi.oi_oa->o_id = loi->loi_id;
909                 req->rq_oi.oi_oa->o_stripe_idx = i;
910
911                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
912                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
913                 if (req->rq_oi.oi_md == NULL) {
914                         OBDO_FREE(req->rq_oi.oi_oa);
915                         OBD_FREE(req, sizeof(*req));
916                         GOTO(out, rc = -ENOMEM);
917                 }
918
919                 req->rq_idx = loi->loi_ost_idx;
920                 req->rq_stripe = i;
921
922                 /* XXX LOV STACKING */
923                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
924                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
925                 req->rq_oabufs = info[i].count;
926                 req->rq_pgaidx = shift;
927                 shift += req->rq_oabufs;
928
929                 /* remember the index for sort brw_page array */
930                 info[i].index = req->rq_pgaidx;
931
932                 req->rq_oi.oi_capa = oinfo->oi_capa;
933
934                 lov_set_add_req(req, set);
935         }
936         if (!set->set_count)
937                 GOTO(out, rc = -EIO);
938
939         /* rotate & sort the brw_page array */
940         for (i = 0; i < oa_bufs; i++) {
941                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
942
943                 shift = info[stripe].index + info[stripe].off;
944                 LASSERT(shift < oa_bufs);
945                 set->set_pga[shift] = pga[i];
946                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
947                                   &set->set_pga[shift].off);
948                 info[stripe].off++;
949         }
950 out:
951         if (info)
952                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
953
954         if (rc == 0)
955                 *reqset = set;
956         else
957                 lov_fini_brw_set(set);
958
959         RETURN(rc);
960 }
961
962 int lov_fini_getattr_set(struct lov_request_set *set)
963 {
964         int rc = 0;
965         ENTRY;
966
967         if (set == NULL)
968                 RETURN(0);
969         LASSERT(set->set_exp);
970         if (set->set_completes)
971                 rc = common_attr_done(set);
972
973         lov_put_reqset(set);
974
975         RETURN(rc);
976 }
977
978 /* The callback for osc_getattr_async that finilizes a request info when a
979  * response is received. */
980 static int cb_getattr_update(void *cookie, int rc)
981 {
982         struct obd_info *oinfo = cookie;
983         struct lov_request *lovreq;
984         lovreq = container_of(oinfo, struct lov_request, rq_oi);
985         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
986 }
987
988 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
989                          struct lov_request_set **reqset)
990 {
991         struct lov_request_set *set;
992         struct lov_obd *lov = &exp->exp_obd->u.lov;
993         int rc = 0, i;
994         ENTRY;
995
996         OBD_ALLOC(set, sizeof(*set));
997         if (set == NULL)
998                 RETURN(-ENOMEM);
999         lov_init_set(set);
1000
1001         set->set_exp = exp;
1002         set->set_oi = oinfo;
1003
1004         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1005                 struct lov_oinfo *loi;
1006                 struct lov_request *req;
1007
1008                 loi = oinfo->oi_md->lsm_oinfo[i];
1009                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1010                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1011                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1012                         continue;
1013                 }
1014
1015                 OBD_ALLOC(req, sizeof(*req));
1016                 if (req == NULL)
1017                         GOTO(out_set, rc = -ENOMEM);
1018
1019                 req->rq_stripe = i;
1020                 req->rq_idx = loi->loi_ost_idx;
1021
1022                 OBDO_ALLOC(req->rq_oi.oi_oa);
1023                 if (req->rq_oi.oi_oa == NULL) {
1024                         OBD_FREE(req, sizeof(*req));
1025                         GOTO(out_set, rc = -ENOMEM);
1026                 }
1027                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1028                        sizeof(*req->rq_oi.oi_oa));
1029                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1030                 req->rq_oi.oi_cb_up = cb_getattr_update;
1031                 req->rq_oi.oi_capa = oinfo->oi_capa;
1032
1033                 lov_set_add_req(req, set);
1034         }
1035         if (!set->set_count)
1036                 GOTO(out_set, rc = -EIO);
1037         *reqset = set;
1038         RETURN(rc);
1039 out_set:
1040         lov_fini_getattr_set(set);
1041         RETURN(rc);
1042 }
1043
1044 int lov_fini_destroy_set(struct lov_request_set *set)
1045 {
1046         ENTRY;
1047
1048         if (set == NULL)
1049                 RETURN(0);
1050         LASSERT(set->set_exp);
1051         if (set->set_completes) {
1052                 /* FIXME update qos data here */
1053         }
1054
1055         lov_put_reqset(set);
1056
1057         RETURN(0);
1058 }
1059
1060 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1061                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1062                          struct obd_trans_info *oti,
1063                          struct lov_request_set **reqset)
1064 {
1065         struct lov_request_set *set;
1066         struct lov_obd *lov = &exp->exp_obd->u.lov;
1067         int rc = 0, i;
1068         ENTRY;
1069
1070         OBD_ALLOC(set, sizeof(*set));
1071         if (set == NULL)
1072                 RETURN(-ENOMEM);
1073         lov_init_set(set);
1074
1075         set->set_exp = exp;
1076         set->set_oi = oinfo;
1077         set->set_oi->oi_md = lsm;
1078         set->set_oi->oi_oa = src_oa;
1079         set->set_oti = oti;
1080         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1081                 set->set_cookies = oti->oti_logcookies;
1082
1083         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1084                 struct lov_oinfo *loi;
1085                 struct lov_request *req;
1086
1087                 loi = lsm->lsm_oinfo[i];
1088                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1089                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1090                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1091                         continue;
1092                 }
1093
1094                 OBD_ALLOC(req, sizeof(*req));
1095                 if (req == NULL)
1096                         GOTO(out_set, rc = -ENOMEM);
1097
1098                 req->rq_stripe = i;
1099                 req->rq_idx = loi->loi_ost_idx;
1100
1101                 OBDO_ALLOC(req->rq_oi.oi_oa);
1102                 if (req->rq_oi.oi_oa == NULL) {
1103                         OBD_FREE(req, sizeof(*req));
1104                         GOTO(out_set, rc = -ENOMEM);
1105                 }
1106                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1107                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1108                 lov_set_add_req(req, set);
1109         }
1110         if (!set->set_count)
1111                 GOTO(out_set, rc = -EIO);
1112         *reqset = set;
1113         RETURN(rc);
1114 out_set:
1115         lov_fini_destroy_set(set);
1116         RETURN(rc);
1117 }
1118
1119 int lov_fini_setattr_set(struct lov_request_set *set)
1120 {
1121         int rc = 0;
1122         ENTRY;
1123
1124         if (set == NULL)
1125                 RETURN(0);
1126         LASSERT(set->set_exp);
1127         if (set->set_completes) {
1128                 rc = common_attr_done(set);
1129                 /* FIXME update qos data here */
1130         }
1131
1132         lov_put_reqset(set);
1133         RETURN(rc);
1134 }
1135
1136 int lov_update_setattr_set(struct lov_request_set *set,
1137                            struct lov_request *req, int rc)
1138 {
1139         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1140         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1141         ENTRY;
1142
1143         lov_update_set(set, req, rc);
1144
1145         /* grace error on inactive ost */
1146         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1147                     lov->lov_tgts[req->rq_idx]->ltd_active))
1148                 rc = 0;
1149
1150         if (rc == 0) {
1151                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1152                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1153                                 req->rq_oi.oi_oa->o_ctime;
1154                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1155                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1156                                 req->rq_oi.oi_oa->o_mtime;
1157                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1158                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1159                                 req->rq_oi.oi_oa->o_atime;
1160         }
1161
1162         RETURN(rc);
1163 }
1164
1165 /* The callback for osc_setattr_async that finilizes a request info when a
1166  * response is received. */
1167 static int cb_setattr_update(void *cookie, int rc)
1168 {
1169         struct obd_info *oinfo = cookie;
1170         struct lov_request *lovreq;
1171         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1172         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1173 }
1174
1175 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1176                          struct obd_trans_info *oti,
1177                          struct lov_request_set **reqset)
1178 {
1179         struct lov_request_set *set;
1180         struct lov_obd *lov = &exp->exp_obd->u.lov;
1181         int rc = 0, i;
1182         ENTRY;
1183
1184         OBD_ALLOC(set, sizeof(*set));
1185         if (set == NULL)
1186                 RETURN(-ENOMEM);
1187         lov_init_set(set);
1188
1189         set->set_exp = exp;
1190         set->set_oti = oti;
1191         set->set_oi = oinfo;
1192         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1193                 set->set_cookies = oti->oti_logcookies;
1194
1195         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1196                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1197                 struct lov_request *req;
1198
1199                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1200                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1201                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1202                         continue;
1203                 }
1204
1205                 OBD_ALLOC(req, sizeof(*req));
1206                 if (req == NULL)
1207                         GOTO(out_set, rc = -ENOMEM);
1208                 req->rq_stripe = i;
1209                 req->rq_idx = loi->loi_ost_idx;
1210
1211                 OBDO_ALLOC(req->rq_oi.oi_oa);
1212                 if (req->rq_oi.oi_oa == NULL) {
1213                         OBD_FREE(req, sizeof(*req));
1214                         GOTO(out_set, rc = -ENOMEM);
1215                 }
1216                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1217                        sizeof(*req->rq_oi.oi_oa));
1218                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1219                 req->rq_oi.oi_oa->o_stripe_idx = i;
1220                 req->rq_oi.oi_cb_up = cb_setattr_update;
1221                 req->rq_oi.oi_capa = oinfo->oi_capa;
1222
1223                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1224                         int off = lov_stripe_offset(oinfo->oi_md,
1225                                                     oinfo->oi_oa->o_size, i,
1226                                                     &req->rq_oi.oi_oa->o_size);
1227
1228                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1229                                 req->rq_oi.oi_oa->o_size--;
1230
1231                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1232                                i, req->rq_oi.oi_oa->o_size,
1233                                oinfo->oi_oa->o_size);
1234                 }
1235                 lov_set_add_req(req, set);
1236         }
1237         if (!set->set_count)
1238                 GOTO(out_set, rc = -EIO);
1239         *reqset = set;
1240         RETURN(rc);
1241 out_set:
1242         lov_fini_setattr_set(set);
1243         RETURN(rc);
1244 }
1245
1246 int lov_fini_punch_set(struct lov_request_set *set)
1247 {
1248         int rc = 0;
1249         ENTRY;
1250
1251         if (set == NULL)
1252                 RETURN(0);
1253         LASSERT(set->set_exp);
1254         if (set->set_completes) {
1255                 rc = -EIO;
1256                 /* FIXME update qos data here */
1257                 if (set->set_success)
1258                         rc = common_attr_done(set);
1259         }
1260
1261         lov_put_reqset(set);
1262
1263         RETURN(rc);
1264 }
1265
1266 int lov_update_punch_set(struct lov_request_set *set,
1267                          struct lov_request *req, int rc)
1268 {
1269         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1270         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1271         ENTRY;
1272
1273         lov_update_set(set, req, rc);
1274
1275         /* grace error on inactive ost */
1276         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1277                 rc = 0;
1278
1279         if (rc == 0) {
1280                 lov_stripe_lock(lsm);
1281                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1282                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1283                                 req->rq_oi.oi_oa->o_blocks;
1284                 }
1285
1286                 /* Do we need to update lvb_size here? It needn't because
1287                  * it have been done in ll_truncate(). -jay */
1288                 lov_stripe_unlock(lsm);
1289         }
1290
1291         RETURN(rc);
1292 }
1293
1294 /* The callback for osc_punch that finilizes a request info when a response
1295  * is received. */
1296 static int cb_update_punch(void *cookie, int rc)
1297 {
1298         struct obd_info *oinfo = cookie;
1299         struct lov_request *lovreq;
1300         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1301         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1302 }
1303
1304 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1305                        struct obd_trans_info *oti,
1306                        struct lov_request_set **reqset)
1307 {
1308         struct lov_request_set *set;
1309         struct lov_obd *lov = &exp->exp_obd->u.lov;
1310         int rc = 0, i;
1311         ENTRY;
1312
1313         OBD_ALLOC(set, sizeof(*set));
1314         if (set == NULL)
1315                 RETURN(-ENOMEM);
1316         lov_init_set(set);
1317
1318         set->set_oi = oinfo;
1319         set->set_exp = exp;
1320
1321         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1322                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1323                 struct lov_request *req;
1324                 obd_off rs, re;
1325
1326                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1327                                            oinfo->oi_policy.l_extent.start,
1328                                            oinfo->oi_policy.l_extent.end,
1329                                            &rs, &re))
1330                         continue;
1331
1332                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1333                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1334                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1335                         GOTO(out_set, rc = -EIO);
1336                 }
1337
1338                 OBD_ALLOC(req, sizeof(*req));
1339                 if (req == NULL)
1340                         GOTO(out_set, rc = -ENOMEM);
1341                 req->rq_stripe = i;
1342                 req->rq_idx = loi->loi_ost_idx;
1343
1344                 OBDO_ALLOC(req->rq_oi.oi_oa);
1345                 if (req->rq_oi.oi_oa == NULL) {
1346                         OBD_FREE(req, sizeof(*req));
1347                         GOTO(out_set, rc = -ENOMEM);
1348                 }
1349                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1350                        sizeof(*req->rq_oi.oi_oa));
1351                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1352                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1353                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1354
1355                 req->rq_oi.oi_oa->o_stripe_idx = i;
1356                 req->rq_oi.oi_cb_up = cb_update_punch;
1357
1358                 req->rq_oi.oi_policy.l_extent.start = rs;
1359                 req->rq_oi.oi_policy.l_extent.end = re;
1360                 req->rq_oi.oi_policy.l_extent.gid = -1;
1361
1362                 req->rq_oi.oi_capa = oinfo->oi_capa;
1363
1364                 lov_set_add_req(req, set);
1365         }
1366         if (!set->set_count)
1367                 GOTO(out_set, rc = -EIO);
1368         *reqset = set;
1369         RETURN(rc);
1370 out_set:
1371         lov_fini_punch_set(set);
1372         RETURN(rc);
1373 }
1374
1375 int lov_fini_sync_set(struct lov_request_set *set)
1376 {
1377         int rc = 0;
1378         ENTRY;
1379
1380         if (set == NULL)
1381                 RETURN(0);
1382         LASSERT(set->set_exp);
1383         if (set->set_completes) {
1384                 if (!set->set_success)
1385                         rc = -EIO;
1386                 /* FIXME update qos data here */
1387         }
1388
1389         lov_put_reqset(set);
1390
1391         RETURN(rc);
1392 }
1393
1394 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1395                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1396                       obd_off start, obd_off end,
1397                       struct lov_request_set **reqset)
1398 {
1399         struct lov_request_set *set;
1400         struct lov_obd *lov = &exp->exp_obd->u.lov;
1401         int rc = 0, i;
1402         ENTRY;
1403
1404         OBD_ALLOC(set, sizeof(*set));
1405         if (set == NULL)
1406                 RETURN(-ENOMEM);
1407         lov_init_set(set);
1408
1409         set->set_exp = exp;
1410         set->set_oi = oinfo;
1411         set->set_oi->oi_md = lsm;
1412         set->set_oi->oi_oa = src_oa;
1413
1414         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1415                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1416                 struct lov_request *req;
1417                 obd_off rs, re;
1418
1419                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1420                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1421                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1422                         continue;
1423                 }
1424
1425                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1426                         continue;
1427
1428                 OBD_ALLOC(req, sizeof(*req));
1429                 if (req == NULL)
1430                         GOTO(out_set, rc = -ENOMEM);
1431                 req->rq_stripe = i;
1432                 req->rq_idx = loi->loi_ost_idx;
1433
1434                 OBDO_ALLOC(req->rq_oi.oi_oa);
1435                 if (req->rq_oi.oi_oa == NULL) {
1436                         OBD_FREE(req, sizeof(*req));
1437                         GOTO(out_set, rc = -ENOMEM);
1438                 }
1439                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1440                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1441                 req->rq_oi.oi_oa->o_stripe_idx = i;
1442
1443                 req->rq_oi.oi_policy.l_extent.start = rs;
1444                 req->rq_oi.oi_policy.l_extent.end = re;
1445                 req->rq_oi.oi_policy.l_extent.gid = -1;
1446
1447                 lov_set_add_req(req, set);
1448         }
1449         if (!set->set_count)
1450                 GOTO(out_set, rc = -EIO);
1451         *reqset = set;
1452         RETURN(rc);
1453 out_set:
1454         lov_fini_sync_set(set);
1455         RETURN(rc);
1456 }
1457
1458 #define LOV_U64_MAX ((__u64)~0ULL)
1459 #define LOV_SUM_MAX(tot, add)                                           \
1460         do {                                                            \
1461                 if ((tot) + (add) < (tot))                              \
1462                         (tot) = LOV_U64_MAX;                            \
1463                 else                                                    \
1464                         (tot) += (add);                                 \
1465         } while(0)
1466
1467 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1468 {
1469         ENTRY;
1470
1471         if (success) {
1472                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1473
1474                 if (osfs->os_files != LOV_U64_MAX)
1475                         do_div(osfs->os_files, expected_stripes);
1476                 if (osfs->os_ffree != LOV_U64_MAX)
1477                         do_div(osfs->os_ffree, expected_stripes);
1478
1479                 spin_lock(&obd->obd_osfs_lock);
1480                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1481                 obd->obd_osfs_age = cfs_time_current_64();
1482                 spin_unlock(&obd->obd_osfs_lock);
1483                 RETURN(0);
1484         }
1485
1486         RETURN(-EIO);
1487 }
1488
1489 int lov_fini_statfs_set(struct lov_request_set *set)
1490 {
1491         int rc = 0;
1492         ENTRY;
1493
1494         if (set == NULL)
1495                 RETURN(0);
1496
1497         if (set->set_completes) {
1498                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1499                                      set->set_success);
1500         }
1501         lov_put_reqset(set);
1502         RETURN(rc);
1503 }
1504
1505 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1506                        int success)
1507 {
1508         int shift = 0, quit = 0;
1509         __u64 tmp;
1510
1511         if (success == 0) {
1512                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1513         } else {
1514                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1515                         /* assume all block sizes are always powers of 2 */
1516                         /* get the bits difference */
1517                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1518                         for (shift = 0; shift <= 64; ++shift) {
1519                                 if (tmp & 1) {
1520                                         if (quit)
1521                                                 break;
1522                                         else
1523                                                 quit = 1;
1524                                         shift = 0;
1525                                 }
1526                                 tmp >>= 1;
1527                         }
1528                 }
1529
1530                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1531                         osfs->os_bsize = lov_sfs->os_bsize;
1532
1533                         osfs->os_bfree  >>= shift;
1534                         osfs->os_bavail >>= shift;
1535                         osfs->os_blocks >>= shift;
1536                 } else if (shift != 0) {
1537                         lov_sfs->os_bfree  >>= shift;
1538                         lov_sfs->os_bavail >>= shift;
1539                         lov_sfs->os_blocks >>= shift;
1540                 }
1541 #ifdef MIN_DF
1542                 /* Sandia requested that df (and so, statfs) only
1543                    returned minimal available space on
1544                    a single OST, so people would be able to
1545                    write this much data guaranteed. */
1546                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1547                         /* Presumably if new bavail is smaller,
1548                            new bfree is bigger as well */
1549                         osfs->os_bfree = lov_sfs->os_bfree;
1550                         osfs->os_bavail = lov_sfs->os_bavail;
1551                 }
1552 #else
1553                 osfs->os_bfree += lov_sfs->os_bfree;
1554                 osfs->os_bavail += lov_sfs->os_bavail;
1555 #endif
1556                 osfs->os_blocks += lov_sfs->os_blocks;
1557                 /* XXX not sure about this one - depends on policy.
1558                  *   - could be minimum if we always stripe on all OBDs
1559                  *     (but that would be wrong for any other policy,
1560                  *     if one of the OBDs has no more objects left)
1561                  *   - could be sum if we stripe whole objects
1562                  *   - could be average, just to give a nice number
1563                  *
1564                  * To give a "reasonable" (if not wholly accurate)
1565                  * number, we divide the total number of free objects
1566                  * by expected stripe count (watch out for overflow).
1567                  */
1568                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1569                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1570         }
1571 }
1572
1573 /* The callback for osc_statfs_async that finilizes a request info when a
1574  * response is received. */
1575 static int cb_statfs_update(void *cookie, int rc)
1576 {
1577         struct obd_info *oinfo = cookie;
1578         struct lov_request *lovreq;
1579         struct obd_statfs *osfs, *lov_sfs;
1580         struct lov_obd *lov;
1581         struct lov_tgt_desc *tgt;
1582         struct obd_device *lovobd, *tgtobd;
1583         int success;
1584         ENTRY;
1585
1586         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1587         lovobd = lovreq->rq_rqset->set_obd;
1588         lov = &lovobd->u.lov;
1589         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1590         lov_sfs = oinfo->oi_osfs;
1591         success = lovreq->rq_rqset->set_success;
1592         /* XXX: the same is done in lov_update_common_set, however
1593            lovset->set_exp is not initialized. */
1594         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1595         if (rc)
1596                 GOTO(out, rc);
1597  
1598         obd_getref(lovobd);
1599         tgt = lov->lov_tgts[lovreq->rq_idx];
1600         if (!tgt || !tgt->ltd_active)
1601                 GOTO(out_update, rc);
1602
1603         tgtobd = class_exp2obd(tgt->ltd_exp);
1604         spin_lock(&tgtobd->obd_osfs_lock);
1605         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1606         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1607                 tgtobd->obd_osfs_age = cfs_time_current_64();
1608         spin_unlock(&tgtobd->obd_osfs_lock);
1609
1610 out_update:
1611         lov_update_statfs(osfs, lov_sfs, success);
1612         qos_update(lov);
1613         obd_putref(lovobd);
1614
1615 out:
1616         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1617             lov_finished_set(lovreq->rq_rqset)) {
1618                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1619                                     lovreq->rq_rqset->set_success !=
1620                                                   lovreq->rq_rqset->set_count);
1621                qos_statfs_done(lov);
1622         }
1623
1624         RETURN(0);
1625 }
1626
1627 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1628                         struct lov_request_set **reqset)
1629 {
1630         struct lov_request_set *set;
1631         struct lov_obd *lov = &obd->u.lov;
1632         int rc = 0, i;
1633         ENTRY;
1634
1635         OBD_ALLOC(set, sizeof(*set));
1636         if (set == NULL)
1637                 RETURN(-ENOMEM);
1638         lov_init_set(set);
1639
1640         set->set_obd = obd;
1641         set->set_oi = oinfo;
1642
1643         /* We only get block data from the OBD */
1644         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1645                 struct lov_request *req;
1646
1647                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1648                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1649                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1650                         continue;
1651                 }
1652
1653                 /* skip targets that have been explicitely disabled by the
1654                  * administrator */
1655                 if (!lov->lov_tgts[i]->ltd_exp) {
1656                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1657                         continue;
1658                 }
1659
1660                 OBD_ALLOC(req, sizeof(*req));
1661                 if (req == NULL)
1662                         GOTO(out_set, rc = -ENOMEM);
1663
1664                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1665                 if (req->rq_oi.oi_osfs == NULL) {
1666                         OBD_FREE(req, sizeof(*req));
1667                         GOTO(out_set, rc = -ENOMEM);
1668                 }
1669
1670                 req->rq_idx = i;
1671                 req->rq_oi.oi_cb_up = cb_statfs_update;
1672                 req->rq_oi.oi_flags = oinfo->oi_flags;
1673
1674                 lov_set_add_req(req, set);
1675         }
1676         if (!set->set_count)
1677                 GOTO(out_set, rc = -EIO);
1678         *reqset = set;
1679         RETURN(rc);
1680 out_set:
1681         lov_fini_statfs_set(set);
1682         RETURN(rc);
1683 }