Whamcloud - gitweb
LU-1770 ptlrpc: introducing OBD_CONNECT_FLOCK_OWNER flag
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOV
38
39 #ifdef __KERNEL__
40 #include <libcfs/libcfs.h>
41 #else
42 #include <liblustre.h>
43 #endif
44
45 #include <obd_class.h>
46 #include <obd_lov.h>
47 #include <lustre/lustre_idl.h>
48
49 #include "lov_internal.h"
50
51 static void lov_init_set(struct lov_request_set *set)
52 {
53         set->set_count = 0;
54         cfs_atomic_set(&set->set_completes, 0);
55         cfs_atomic_set(&set->set_success, 0);
56         cfs_atomic_set(&set->set_finish_checked, 0);
57         set->set_cookies = 0;
58         CFS_INIT_LIST_HEAD(&set->set_list);
59         cfs_atomic_set(&set->set_refcount, 1);
60         cfs_waitq_init(&set->set_waitq);
61         cfs_spin_lock_init(&set->set_lock);
62 }
63
64 void lov_finish_set(struct lov_request_set *set)
65 {
66         cfs_list_t *pos, *n;
67         ENTRY;
68
69         LASSERT(set);
70         cfs_list_for_each_safe(pos, n, &set->set_list) {
71                 struct lov_request *req = cfs_list_entry(pos,
72                                                          struct lov_request,
73                                                          rq_link);
74                 cfs_list_del_init(&req->rq_link);
75
76                 if (req->rq_oi.oi_oa)
77                         OBDO_FREE(req->rq_oi.oi_oa);
78                 if (req->rq_oi.oi_md)
79                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
80                 if (req->rq_oi.oi_osfs)
81                         OBD_FREE(req->rq_oi.oi_osfs,
82                                  sizeof(*req->rq_oi.oi_osfs));
83                 OBD_FREE(req, sizeof(*req));
84         }
85
86         if (set->set_pga) {
87                 int len = set->set_oabufs * sizeof(*set->set_pga);
88                 OBD_FREE_LARGE(set->set_pga, len);
89         }
90         if (set->set_lockh)
91                 lov_llh_put(set->set_lockh);
92
93         OBD_FREE(set, sizeof(*set));
94         EXIT;
95 }
96
97 int lov_set_finished(struct lov_request_set *set, int idempotent)
98 {
99         int completes = cfs_atomic_read(&set->set_completes);
100
101         CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
102
103         if (completes == set->set_count) {
104                 if (idempotent)
105                         return 1;
106                 if (cfs_atomic_inc_return(&set->set_finish_checked) == 1)
107                         return 1;
108         }
109         return 0;
110 }
111
112 void lov_update_set(struct lov_request_set *set,
113                     struct lov_request *req, int rc)
114 {
115         req->rq_complete = 1;
116         req->rq_rc = rc;
117
118         cfs_atomic_inc(&set->set_completes);
119         if (rc == 0)
120                 cfs_atomic_inc(&set->set_success);
121
122         cfs_waitq_signal(&set->set_waitq);
123 }
124
125 int lov_update_common_set(struct lov_request_set *set,
126                           struct lov_request *req, int rc)
127 {
128         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
129         ENTRY;
130
131         lov_update_set(set, req, rc);
132
133         /* grace error on inactive ost */
134         if (rc && !(lov->lov_tgts[req->rq_idx] &&
135                     lov->lov_tgts[req->rq_idx]->ltd_active))
136                 rc = 0;
137
138         /* FIXME in raid1 regime, should return 0 */
139         RETURN(rc);
140 }
141
142 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
143 {
144         cfs_list_add_tail(&req->rq_link, &set->set_list);
145         set->set_count++;
146         req->rq_rqset = set;
147 }
148
149 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
150                                struct lov_oinfo *loi, int flags,
151                                struct ost_lvb *lvb, __u32 mode, int rc);
152
153 static int lov_update_enqueue_lov(struct obd_export *exp,
154                                   struct lustre_handle *lov_lockhp,
155                                   struct lov_oinfo *loi, int flags, int idx,
156                                   __u64 oid, int rc)
157 {
158         struct lov_obd *lov = &exp->exp_obd->u.lov;
159
160         if (rc != ELDLM_OK &&
161             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
162                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
163                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
164                         /* -EUSERS used by OST to report file contention */
165                         if (rc != -EINTR && rc != -EUSERS)
166                                 CERROR("enqueue objid "LPX64" subobj "
167                                        LPX64" on OST idx %d: rc %d\n",
168                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
169                 } else
170                         rc = ELDLM_OK;
171         }
172         return rc;
173 }
174
175 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
176 {
177         struct lov_request_set *set = req->rq_rqset;
178         struct lustre_handle *lov_lockhp;
179         struct obd_info *oi = set->set_oi;
180         struct lov_oinfo *loi;
181         ENTRY;
182
183         LASSERT(oi != NULL);
184
185         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
186         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
187
188         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
189          * and that copy can be arbitrarily out of date.
190          *
191          * The LOV API is due for a serious rewriting anyways, and this
192          * can be addressed then. */
193
194         lov_stripe_lock(oi->oi_md);
195         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
196                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
197         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
198                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
199         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
200                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
201         lov_stripe_unlock(oi->oi_md);
202         lov_update_set(set, req, rc);
203         RETURN(rc);
204 }
205
206 /* The callback for osc_enqueue that updates lov info for every OSC request. */
207 static int cb_update_enqueue(void *cookie, int rc)
208 {
209         struct obd_info *oinfo = cookie;
210         struct ldlm_enqueue_info *einfo;
211         struct lov_request *lovreq;
212
213         lovreq = container_of(oinfo, struct lov_request, rq_oi);
214         einfo = lovreq->rq_rqset->set_ei;
215         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
216 }
217
218 static int enqueue_done(struct lov_request_set *set, __u32 mode)
219 {
220         struct lov_request *req;
221         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
222         int completes = cfs_atomic_read(&set->set_completes);
223         int rc = 0;
224         ENTRY;
225
226         /* enqueue/match success, just return */
227         if (completes && completes == cfs_atomic_read(&set->set_success))
228                 RETURN(0);
229
230         /* cancel enqueued/matched locks */
231         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
232                 struct lustre_handle *lov_lockhp;
233
234                 if (!req->rq_complete || req->rq_rc)
235                         continue;
236
237                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
238                 LASSERT(lov_lockhp);
239                 if (!lustre_handle_is_used(lov_lockhp))
240                         continue;
241
242                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
243                                 req->rq_oi.oi_md, mode, lov_lockhp);
244                 if (rc && lov->lov_tgts[req->rq_idx] &&
245                     lov->lov_tgts[req->rq_idx]->ltd_active)
246                         CERROR("cancelling obdjid "LPX64" on OST "
247                                "idx %d error: rc = %d\n",
248                                req->rq_oi.oi_md->lsm_object_id,
249                                req->rq_idx, rc);
250         }
251         if (set->set_lockh)
252                 lov_llh_put(set->set_lockh);
253         RETURN(rc);
254 }
255
256 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
257                          struct ptlrpc_request_set *rqset)
258 {
259         int ret = 0;
260         ENTRY;
261
262         if (set == NULL)
263                 RETURN(0);
264         LASSERT(set->set_exp);
265         /* Do enqueue_done only for sync requests and if any request
266          * succeeded. */
267         if (!rqset) {
268                 if (rc)
269                         cfs_atomic_set(&set->set_completes, 0);
270                 ret = enqueue_done(set, mode);
271         } else if (set->set_lockh)
272                 lov_llh_put(set->set_lockh);
273
274         lov_put_reqset(set);
275
276         RETURN(rc ? rc : ret);
277 }
278
279 static void lov_llh_addref(void *llhp)
280 {
281         struct lov_lock_handles *llh = llhp;
282
283         cfs_atomic_inc(&llh->llh_refcount);
284         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
285                cfs_atomic_read(&llh->llh_refcount));
286 }
287
288 static struct portals_handle_ops lov_handle_ops = {
289         .hop_addref = lov_llh_addref,
290         .hop_free   = NULL,
291 };
292
293 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
294 {
295         struct lov_lock_handles *llh;
296
297         OBD_ALLOC(llh, sizeof *llh +
298                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
299         if (llh == NULL)
300                 return NULL;
301
302         cfs_atomic_set(&llh->llh_refcount, 2);
303         llh->llh_stripe_count = lsm->lsm_stripe_count;
304         CFS_INIT_LIST_HEAD(&llh->llh_handle.h_link);
305         class_handle_hash(&llh->llh_handle, &lov_handle_ops);
306
307         return llh;
308 }
309
310 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
311                          struct ldlm_enqueue_info *einfo,
312                          struct lov_request_set **reqset)
313 {
314         struct lov_obd *lov = &exp->exp_obd->u.lov;
315         struct lov_request_set *set;
316         int i, rc = 0;
317         ENTRY;
318
319         OBD_ALLOC(set, sizeof(*set));
320         if (set == NULL)
321                 RETURN(-ENOMEM);
322         lov_init_set(set);
323
324         set->set_exp = exp;
325         set->set_oi = oinfo;
326         set->set_ei = einfo;
327         set->set_lockh = lov_llh_new(oinfo->oi_md);
328         if (set->set_lockh == NULL)
329                 GOTO(out_set, rc = -ENOMEM);
330         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
331
332         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
333                 struct lov_oinfo *loi;
334                 struct lov_request *req;
335                 obd_off start, end;
336
337                 loi = oinfo->oi_md->lsm_oinfo[i];
338                 if (!lov_stripe_intersects(oinfo->oi_md, i,
339                                            oinfo->oi_policy.l_extent.start,
340                                            oinfo->oi_policy.l_extent.end,
341                                            &start, &end))
342                         continue;
343
344                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
345                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
346                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
347                         continue;
348                 }
349
350                 OBD_ALLOC(req, sizeof(*req));
351                 if (req == NULL)
352                         GOTO(out_set, rc = -ENOMEM);
353
354                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
355                         sizeof(struct lov_oinfo *) +
356                         sizeof(struct lov_oinfo);
357                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
358                 if (req->rq_oi.oi_md == NULL) {
359                         OBD_FREE(req, sizeof(*req));
360                         GOTO(out_set, rc = -ENOMEM);
361                 }
362                 req->rq_oi.oi_md->lsm_oinfo[0] =
363                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
364                         sizeof(struct lov_oinfo *);
365
366                 /* Set lov request specific parameters. */
367                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
368                 req->rq_oi.oi_cb_up = cb_update_enqueue;
369                 req->rq_oi.oi_flags = oinfo->oi_flags;
370
371                 LASSERT(req->rq_oi.oi_lockh);
372
373                 req->rq_oi.oi_policy.l_extent.gid =
374                         oinfo->oi_policy.l_extent.gid;
375                 req->rq_oi.oi_policy.l_extent.start = start;
376                 req->rq_oi.oi_policy.l_extent.end = end;
377
378                 req->rq_idx = loi->loi_ost_idx;
379                 req->rq_stripe = i;
380
381                 /* XXX LOV STACKING: submd should be from the subobj */
382                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
383                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
384                 req->rq_oi.oi_md->lsm_stripe_count = 0;
385                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
386                         loi->loi_kms_valid;
387                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
388                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
389
390                 lov_set_add_req(req, set);
391         }
392         if (!set->set_count)
393                 GOTO(out_set, rc = -EIO);
394         *reqset = set;
395         RETURN(0);
396 out_set:
397         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
398         RETURN(rc);
399 }
400
401 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
402 {
403         int rc = 0;
404         ENTRY;
405
406         if (set == NULL)
407                 RETURN(0);
408         LASSERT(set->set_exp);
409         rc = enqueue_done(set, mode);
410         if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
411             (flags & LDLM_FL_TEST_LOCK))
412                 lov_llh_put(set->set_lockh);
413
414         lov_put_reqset(set);
415
416         RETURN(rc);
417 }
418
419 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
420                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
421                        __u32 mode, struct lustre_handle *lockh,
422                        struct lov_request_set **reqset)
423 {
424         struct lov_obd *lov = &exp->exp_obd->u.lov;
425         struct lov_request_set *set;
426         int i, rc = 0;
427         ENTRY;
428
429         OBD_ALLOC(set, sizeof(*set));
430         if (set == NULL)
431                 RETURN(-ENOMEM);
432         lov_init_set(set);
433
434         set->set_exp = exp;
435         set->set_oi = oinfo;
436         set->set_oi->oi_md = lsm;
437         set->set_lockh = lov_llh_new(lsm);
438         if (set->set_lockh == NULL)
439                 GOTO(out_set, rc = -ENOMEM);
440         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
441
442         for (i = 0; i < lsm->lsm_stripe_count; i++){
443                 struct lov_oinfo *loi;
444                 struct lov_request *req;
445                 obd_off start, end;
446
447                 loi = lsm->lsm_oinfo[i];
448                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
449                                            policy->l_extent.end, &start, &end))
450                         continue;
451
452                 /* FIXME raid1 should grace this error */
453                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
454                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
455                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
456                         GOTO(out_set, rc = -EIO);
457                 }
458
459                 OBD_ALLOC(req, sizeof(*req));
460                 if (req == NULL)
461                         GOTO(out_set, rc = -ENOMEM);
462
463                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
464                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
465                 if (req->rq_oi.oi_md == NULL) {
466                         OBD_FREE(req, sizeof(*req));
467                         GOTO(out_set, rc = -ENOMEM);
468                 }
469
470                 req->rq_oi.oi_policy.l_extent.start = start;
471                 req->rq_oi.oi_policy.l_extent.end = end;
472                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
473
474                 req->rq_idx = loi->loi_ost_idx;
475                 req->rq_stripe = i;
476
477                 /* XXX LOV STACKING: submd should be from the subobj */
478                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
479                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
480                 req->rq_oi.oi_md->lsm_stripe_count = 0;
481
482                 lov_set_add_req(req, set);
483         }
484         if (!set->set_count)
485                 GOTO(out_set, rc = -EIO);
486         *reqset = set;
487         RETURN(rc);
488 out_set:
489         lov_fini_match_set(set, mode, 0);
490         RETURN(rc);
491 }
492
493 int lov_fini_cancel_set(struct lov_request_set *set)
494 {
495         int rc = 0;
496         ENTRY;
497
498         if (set == NULL)
499                 RETURN(0);
500
501         LASSERT(set->set_exp);
502         if (set->set_lockh)
503                 lov_llh_put(set->set_lockh);
504
505         lov_put_reqset(set);
506
507         RETURN(rc);
508 }
509
510 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
511                         struct lov_stripe_md *lsm, __u32 mode,
512                         struct lustre_handle *lockh,
513                         struct lov_request_set **reqset)
514 {
515         struct lov_request_set *set;
516         int i, rc = 0;
517         ENTRY;
518
519         OBD_ALLOC(set, sizeof(*set));
520         if (set == NULL)
521                 RETURN(-ENOMEM);
522         lov_init_set(set);
523
524         set->set_exp = exp;
525         set->set_oi = oinfo;
526         set->set_oi->oi_md = lsm;
527         set->set_lockh = lov_handle2llh(lockh);
528         if (set->set_lockh == NULL) {
529                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
530                 GOTO(out_set, rc = -EINVAL);
531         }
532         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
533
534         for (i = 0; i < lsm->lsm_stripe_count; i++){
535                 struct lov_request *req;
536                 struct lustre_handle *lov_lockhp;
537                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
538
539                 lov_lockhp = set->set_lockh->llh_handles + i;
540                 if (!lustre_handle_is_used(lov_lockhp)) {
541                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
542                                loi->loi_ost_idx, loi->loi_id);
543                         continue;
544                 }
545
546                 OBD_ALLOC(req, sizeof(*req));
547                 if (req == NULL)
548                         GOTO(out_set, rc = -ENOMEM);
549
550                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
551                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
552                 if (req->rq_oi.oi_md == NULL) {
553                         OBD_FREE(req, sizeof(*req));
554                         GOTO(out_set, rc = -ENOMEM);
555                 }
556
557                 req->rq_idx = loi->loi_ost_idx;
558                 req->rq_stripe = i;
559
560                 /* XXX LOV STACKING: submd should be from the subobj */
561                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
562                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
563                 req->rq_oi.oi_md->lsm_stripe_count = 0;
564
565                 lov_set_add_req(req, set);
566         }
567         if (!set->set_count)
568                 GOTO(out_set, rc = -EIO);
569         *reqset = set;
570         RETURN(rc);
571 out_set:
572         lov_fini_cancel_set(set);
573         RETURN(rc);
574 }
575
576 static int lov_update_create_set(struct lov_request_set *set,
577                                  struct lov_request *req, int rc)
578 {
579         struct obd_trans_info *oti = set->set_oti;
580         struct lov_stripe_md *lsm = set->set_oi->oi_md;
581         struct lov_oinfo *loi;
582         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
583         ENTRY;
584
585         if (rc && lov->lov_tgts[req->rq_idx] &&
586             lov->lov_tgts[req->rq_idx]->ltd_active) {
587                 /* Pre-creating objects may timeout via -ETIMEDOUT or
588                  * -ENOTCONN both are always non-critical events. */
589                 CDEBUG(rc == -ETIMEDOUT || rc == -ENOTCONN ? D_HA : D_ERROR,
590                        "error creating fid "LPX64" sub-object "
591                        "on OST idx %d/%d: rc = %d\n",
592                        set->set_oi->oi_oa->o_id, req->rq_idx,
593                        lsm->lsm_stripe_count, rc);
594                 if (rc > 0) {
595                         CERROR("obd_create returned invalid err %d\n", rc);
596                         rc = -EIO;
597                 }
598         }
599
600         cfs_spin_lock(&set->set_lock);
601         req->rq_stripe = cfs_atomic_read(&set->set_success);
602         loi = lsm->lsm_oinfo[req->rq_stripe];
603
604
605         if (rc) {
606                 lov_update_set(set, req, rc);
607                 cfs_spin_unlock(&set->set_lock);
608                 RETURN(rc);
609         }
610
611         loi->loi_id = req->rq_oi.oi_oa->o_id;
612         loi->loi_seq = req->rq_oi.oi_oa->o_seq;
613         loi->loi_ost_idx = req->rq_idx;
614         loi_init(loi);
615
616         if (oti && set->set_cookies)
617                 ++oti->oti_logcookies;
618         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
619                 set->set_cookie_sent++;
620
621         lov_update_set(set, req, rc);
622         cfs_spin_unlock(&set->set_lock);
623
624         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
625                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
626         RETURN(rc);
627 }
628
629 static int create_done(struct obd_export *exp, struct lov_request_set *set,
630                        struct lov_stripe_md **lsmp)
631 {
632         struct lov_obd *lov = &exp->exp_obd->u.lov;
633         struct obd_trans_info *oti = set->set_oti;
634         struct obdo *src_oa = set->set_oi->oi_oa;
635         struct lov_request *req;
636         struct obdo *ret_oa = NULL;
637         int success, attrset = 0, rc = 0;
638         ENTRY;
639
640         LASSERT(cfs_atomic_read(&set->set_completes));
641
642         /* try alloc objects on other osts if osc_create fails for
643          * exceptions: RPC failure, ENOSPC, etc */
644         if (set->set_count != cfs_atomic_read(&set->set_success)) {
645                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
646                         if (req->rq_rc == 0)
647                                 continue;
648
649                         cfs_atomic_dec(&set->set_completes);
650                         req->rq_complete = 0;
651
652                         rc = qos_remedy_create(set, req);
653                         lov_update_create_set(set, req, rc);
654                 }
655         }
656
657         success = cfs_atomic_read(&set->set_success);
658         /* no successful creates */
659         if (success == 0)
660                 GOTO(cleanup, rc);
661
662         if (set->set_count != success) {
663                 set->set_count = success;
664                 qos_shrink_lsm(set);
665         }
666
667         OBDO_ALLOC(ret_oa);
668         if (ret_oa == NULL)
669                 GOTO(cleanup, rc = -ENOMEM);
670
671         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
672                 if (!req->rq_complete || req->rq_rc)
673                         continue;
674                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
675                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
676                                 req->rq_stripe, &attrset);
677         }
678         if (src_oa->o_valid & OBD_MD_FLSIZE &&
679             ret_oa->o_size != src_oa->o_size) {
680                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
681                        src_oa->o_size, ret_oa->o_size);
682                 LBUG();
683         }
684         ret_oa->o_id = src_oa->o_id;
685         ret_oa->o_seq = src_oa->o_seq;
686         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
687         memcpy(src_oa, ret_oa, sizeof(*src_oa));
688         OBDO_FREE(ret_oa);
689
690         *lsmp = set->set_oi->oi_md;
691         GOTO(done, rc = 0);
692
693 cleanup:
694         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
695                 struct obd_export *sub_exp;
696                 int err = 0;
697
698                 if (!req->rq_complete || req->rq_rc)
699                         continue;
700
701                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
702                 err = obd_destroy(NULL, sub_exp, req->rq_oi.oi_oa, NULL, oti,
703                                   NULL, NULL);
704                 if (err)
705                         CERROR("Failed to uncreate objid "LPX64" subobj "
706                                LPX64" on OST idx %d: rc = %d\n",
707                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
708                                req->rq_idx, rc);
709         }
710         if (*lsmp == NULL)
711                 obd_free_memmd(exp, &set->set_oi->oi_md);
712 done:
713         if (oti && set->set_cookies) {
714                 oti->oti_logcookies = set->set_cookies;
715                 if (!set->set_cookie_sent) {
716                         oti_free_cookies(oti);
717                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
718                 } else {
719                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
720                 }
721         }
722         RETURN(rc);
723 }
724
725 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
726 {
727         int rc = 0;
728         ENTRY;
729
730         if (set == NULL)
731                 RETURN(0);
732         LASSERT(set->set_exp);
733         if (cfs_atomic_read(&set->set_completes))
734                 rc = create_done(set->set_exp, set, lsmp);
735
736         lov_put_reqset(set);
737         RETURN(rc);
738 }
739
740 int cb_create_update(void *cookie, int rc)
741 {
742         struct obd_info *oinfo = cookie;
743         struct lov_request *lovreq;
744
745         lovreq = container_of(oinfo, struct lov_request, rq_oi);
746
747         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
748                 if (lovreq->rq_idx == cfs_fail_val)
749                         rc = -ENOTCONN;
750
751         rc = lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
752         if (lov_set_finished(lovreq->rq_rqset, 0))
753                 lov_put_reqset(lovreq->rq_rqset);
754         return rc;
755 }
756
757 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
758                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
759                         struct obd_trans_info *oti,
760                         struct lov_request_set **reqset)
761 {
762         struct lov_request_set *set;
763         int rc = 0;
764         ENTRY;
765
766         OBD_ALLOC(set, sizeof(*set));
767         if (set == NULL)
768                 RETURN(-ENOMEM);
769         lov_init_set(set);
770
771         set->set_exp = exp;
772         set->set_oi = oinfo;
773         set->set_oi->oi_md = *lsmp;
774         set->set_oi->oi_oa = src_oa;
775         set->set_oti = oti;
776         lov_get_reqset(set);
777
778         rc = qos_prep_create(exp, set);
779         /* qos_shrink_lsm() may have allocated a new lsm */
780         *lsmp = oinfo->oi_md;
781         if (rc) {
782                 lov_fini_create_set(set, lsmp);
783                 lov_put_reqset(set);
784         } else {
785                 *reqset = set;
786         }
787         RETURN(rc);
788 }
789
790 static int common_attr_done(struct lov_request_set *set)
791 {
792         cfs_list_t *pos;
793         struct lov_request *req;
794         struct obdo *tmp_oa;
795         int rc = 0, attrset = 0;
796         ENTRY;
797
798         LASSERT(set->set_oi != NULL);
799
800         if (set->set_oi->oi_oa == NULL)
801                 RETURN(0);
802
803         if (!cfs_atomic_read(&set->set_success))
804                 RETURN(-EIO);
805
806         OBDO_ALLOC(tmp_oa);
807         if (tmp_oa == NULL)
808                 GOTO(out, rc = -ENOMEM);
809
810         cfs_list_for_each (pos, &set->set_list) {
811                 req = cfs_list_entry(pos, struct lov_request, rq_link);
812
813                 if (!req->rq_complete || req->rq_rc)
814                         continue;
815                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
816                         continue;
817                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
818                                 req->rq_oi.oi_oa->o_valid,
819                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
820         }
821         if (!attrset) {
822                 CERROR("No stripes had valid attrs\n");
823                 rc = -EIO;
824         }
825         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
826             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
827                 /* When we take attributes of some epoch, we require all the
828                  * ost to be active. */
829                 CERROR("Not all the stripes had valid attrs\n");
830                 GOTO(out, rc = -EIO);
831         }
832
833         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
834         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
835 out:
836         if (tmp_oa)
837                 OBDO_FREE(tmp_oa);
838         RETURN(rc);
839
840 }
841
842 static int brw_done(struct lov_request_set *set)
843 {
844         struct lov_stripe_md *lsm = set->set_oi->oi_md;
845         struct lov_oinfo     *loi = NULL;
846         cfs_list_t *pos;
847         struct lov_request *req;
848         ENTRY;
849
850         cfs_list_for_each (pos, &set->set_list) {
851                 req = cfs_list_entry(pos, struct lov_request, rq_link);
852
853                 if (!req->rq_complete || req->rq_rc)
854                         continue;
855
856                 loi = lsm->lsm_oinfo[req->rq_stripe];
857
858                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
859                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
860         }
861
862         RETURN(0);
863 }
864
865 int lov_fini_brw_set(struct lov_request_set *set)
866 {
867         int rc = 0;
868         ENTRY;
869
870         if (set == NULL)
871                 RETURN(0);
872         LASSERT(set->set_exp);
873         if (cfs_atomic_read(&set->set_completes)) {
874                 rc = brw_done(set);
875                 /* FIXME update qos data here */
876         }
877         lov_put_reqset(set);
878
879         RETURN(rc);
880 }
881
882 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
883                      obd_count oa_bufs, struct brw_page *pga,
884                      struct obd_trans_info *oti,
885                      struct lov_request_set **reqset)
886 {
887         struct {
888                 obd_count       index;
889                 obd_count       count;
890                 obd_count       off;
891         } *info = NULL;
892         struct lov_request_set *set;
893         struct lov_obd *lov = &exp->exp_obd->u.lov;
894         int rc = 0, i, shift;
895         ENTRY;
896
897         OBD_ALLOC(set, sizeof(*set));
898         if (set == NULL)
899                 RETURN(-ENOMEM);
900         lov_init_set(set);
901
902         set->set_exp = exp;
903         set->set_oti = oti;
904         set->set_oi = oinfo;
905         set->set_oabufs = oa_bufs;
906         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
907         if (!set->set_pga)
908                 GOTO(out, rc = -ENOMEM);
909
910         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
911         if (!info)
912                 GOTO(out, rc = -ENOMEM);
913
914         /* calculate the page count for each stripe */
915         for (i = 0; i < oa_bufs; i++) {
916                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
917                 info[stripe].count++;
918         }
919
920         /* alloc and initialize lov request */
921         shift = 0;
922         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
923                 struct lov_oinfo *loi = NULL;
924                 struct lov_request *req;
925
926                 if (info[i].count == 0)
927                         continue;
928
929                 loi = oinfo->oi_md->lsm_oinfo[i];
930                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
931                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
932                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
933                         GOTO(out, rc = -EIO);
934                 }
935
936                 OBD_ALLOC(req, sizeof(*req));
937                 if (req == NULL)
938                         GOTO(out, rc = -ENOMEM);
939
940                 OBDO_ALLOC(req->rq_oi.oi_oa);
941                 if (req->rq_oi.oi_oa == NULL) {
942                         OBD_FREE(req, sizeof(*req));
943                         GOTO(out, rc = -ENOMEM);
944                 }
945
946                 if (oinfo->oi_oa) {
947                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
948                                sizeof(*req->rq_oi.oi_oa));
949                 }
950                 req->rq_oi.oi_oa->o_id = loi->loi_id;
951                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
952                 req->rq_oi.oi_oa->o_stripe_idx = i;
953
954                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
955                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
956                 if (req->rq_oi.oi_md == NULL) {
957                         OBDO_FREE(req->rq_oi.oi_oa);
958                         OBD_FREE(req, sizeof(*req));
959                         GOTO(out, rc = -ENOMEM);
960                 }
961
962                 req->rq_idx = loi->loi_ost_idx;
963                 req->rq_stripe = i;
964
965                 /* XXX LOV STACKING */
966                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
967                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
968                 req->rq_oabufs = info[i].count;
969                 req->rq_pgaidx = shift;
970                 shift += req->rq_oabufs;
971
972                 /* remember the index for sort brw_page array */
973                 info[i].index = req->rq_pgaidx;
974
975                 req->rq_oi.oi_capa = oinfo->oi_capa;
976
977                 lov_set_add_req(req, set);
978         }
979         if (!set->set_count)
980                 GOTO(out, rc = -EIO);
981
982         /* rotate & sort the brw_page array */
983         for (i = 0; i < oa_bufs; i++) {
984                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
985
986                 shift = info[stripe].index + info[stripe].off;
987                 LASSERT(shift < oa_bufs);
988                 set->set_pga[shift] = pga[i];
989                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
990                                   &set->set_pga[shift].off);
991                 info[stripe].off++;
992         }
993 out:
994         if (info)
995                 OBD_FREE_LARGE(info,
996                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
997
998         if (rc == 0)
999                 *reqset = set;
1000         else
1001                 lov_fini_brw_set(set);
1002
1003         RETURN(rc);
1004 }
1005
1006 int lov_fini_getattr_set(struct lov_request_set *set)
1007 {
1008         int rc = 0;
1009         ENTRY;
1010
1011         if (set == NULL)
1012                 RETURN(0);
1013         LASSERT(set->set_exp);
1014         if (cfs_atomic_read(&set->set_completes))
1015                 rc = common_attr_done(set);
1016
1017         lov_put_reqset(set);
1018
1019         RETURN(rc);
1020 }
1021
1022 /* The callback for osc_getattr_async that finilizes a request info when a
1023  * response is received. */
1024 static int cb_getattr_update(void *cookie, int rc)
1025 {
1026         struct obd_info *oinfo = cookie;
1027         struct lov_request *lovreq;
1028         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1029         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1030 }
1031
1032 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
1033                          struct lov_request_set **reqset)
1034 {
1035         struct lov_request_set *set;
1036         struct lov_obd *lov = &exp->exp_obd->u.lov;
1037         int rc = 0, i;
1038         ENTRY;
1039
1040         OBD_ALLOC(set, sizeof(*set));
1041         if (set == NULL)
1042                 RETURN(-ENOMEM);
1043         lov_init_set(set);
1044
1045         set->set_exp = exp;
1046         set->set_oi = oinfo;
1047
1048         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1049                 struct lov_oinfo *loi;
1050                 struct lov_request *req;
1051
1052                 loi = oinfo->oi_md->lsm_oinfo[i];
1053                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1054                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1055                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1056                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1057                                 /* SOM requires all the OSTs to be active. */
1058                                 GOTO(out_set, rc = -EIO);
1059                         continue;
1060                 }
1061
1062                 OBD_ALLOC(req, sizeof(*req));
1063                 if (req == NULL)
1064                         GOTO(out_set, rc = -ENOMEM);
1065
1066                 req->rq_stripe = i;
1067                 req->rq_idx = loi->loi_ost_idx;
1068
1069                 OBDO_ALLOC(req->rq_oi.oi_oa);
1070                 if (req->rq_oi.oi_oa == NULL) {
1071                         OBD_FREE(req, sizeof(*req));
1072                         GOTO(out_set, rc = -ENOMEM);
1073                 }
1074                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1075                        sizeof(*req->rq_oi.oi_oa));
1076                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1077                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1078                 req->rq_oi.oi_cb_up = cb_getattr_update;
1079                 req->rq_oi.oi_capa = oinfo->oi_capa;
1080
1081                 lov_set_add_req(req, set);
1082         }
1083         if (!set->set_count)
1084                 GOTO(out_set, rc = -EIO);
1085         *reqset = set;
1086         RETURN(rc);
1087 out_set:
1088         lov_fini_getattr_set(set);
1089         RETURN(rc);
1090 }
1091
1092 int lov_fini_destroy_set(struct lov_request_set *set)
1093 {
1094         ENTRY;
1095
1096         if (set == NULL)
1097                 RETURN(0);
1098         LASSERT(set->set_exp);
1099         if (cfs_atomic_read(&set->set_completes)) {
1100                 /* FIXME update qos data here */
1101         }
1102
1103         lov_put_reqset(set);
1104
1105         RETURN(0);
1106 }
1107
1108 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1109                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1110                          struct obd_trans_info *oti,
1111                          struct lov_request_set **reqset)
1112 {
1113         struct lov_request_set *set;
1114         struct lov_obd *lov = &exp->exp_obd->u.lov;
1115         int rc = 0, i;
1116         ENTRY;
1117
1118         OBD_ALLOC(set, sizeof(*set));
1119         if (set == NULL)
1120                 RETURN(-ENOMEM);
1121         lov_init_set(set);
1122
1123         set->set_exp = exp;
1124         set->set_oi = oinfo;
1125         set->set_oi->oi_md = lsm;
1126         set->set_oi->oi_oa = src_oa;
1127         set->set_oti = oti;
1128         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1129                 set->set_cookies = oti->oti_logcookies;
1130
1131         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1132                 struct lov_oinfo *loi;
1133                 struct lov_request *req;
1134
1135                 loi = lsm->lsm_oinfo[i];
1136                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1137                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1138                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1139                         continue;
1140                 }
1141
1142                 OBD_ALLOC(req, sizeof(*req));
1143                 if (req == NULL)
1144                         GOTO(out_set, rc = -ENOMEM);
1145
1146                 req->rq_stripe = i;
1147                 req->rq_idx = loi->loi_ost_idx;
1148
1149                 OBDO_ALLOC(req->rq_oi.oi_oa);
1150                 if (req->rq_oi.oi_oa == NULL) {
1151                         OBD_FREE(req, sizeof(*req));
1152                         GOTO(out_set, rc = -ENOMEM);
1153                 }
1154                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1155                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1156                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1157                 lov_set_add_req(req, set);
1158         }
1159         if (!set->set_count)
1160                 GOTO(out_set, rc = -EIO);
1161         *reqset = set;
1162         RETURN(rc);
1163 out_set:
1164         lov_fini_destroy_set(set);
1165         RETURN(rc);
1166 }
1167
1168 int lov_fini_setattr_set(struct lov_request_set *set)
1169 {
1170         int rc = 0;
1171         ENTRY;
1172
1173         if (set == NULL)
1174                 RETURN(0);
1175         LASSERT(set->set_exp);
1176         if (cfs_atomic_read(&set->set_completes)) {
1177                 rc = common_attr_done(set);
1178                 /* FIXME update qos data here */
1179         }
1180
1181         lov_put_reqset(set);
1182         RETURN(rc);
1183 }
1184
1185 int lov_update_setattr_set(struct lov_request_set *set,
1186                            struct lov_request *req, int rc)
1187 {
1188         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1189         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1190         ENTRY;
1191
1192         lov_update_set(set, req, rc);
1193
1194         /* grace error on inactive ost */
1195         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1196                     lov->lov_tgts[req->rq_idx]->ltd_active))
1197                 rc = 0;
1198
1199         if (rc == 0) {
1200                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1201                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1202                                 req->rq_oi.oi_oa->o_ctime;
1203                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1204                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1205                                 req->rq_oi.oi_oa->o_mtime;
1206                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1207                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1208                                 req->rq_oi.oi_oa->o_atime;
1209         }
1210
1211         RETURN(rc);
1212 }
1213
1214 /* The callback for osc_setattr_async that finilizes a request info when a
1215  * response is received. */
1216 static int cb_setattr_update(void *cookie, int rc)
1217 {
1218         struct obd_info *oinfo = cookie;
1219         struct lov_request *lovreq;
1220         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1221         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1222 }
1223
1224 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1225                          struct obd_trans_info *oti,
1226                          struct lov_request_set **reqset)
1227 {
1228         struct lov_request_set *set;
1229         struct lov_obd *lov = &exp->exp_obd->u.lov;
1230         int rc = 0, i;
1231         ENTRY;
1232
1233         OBD_ALLOC(set, sizeof(*set));
1234         if (set == NULL)
1235                 RETURN(-ENOMEM);
1236         lov_init_set(set);
1237
1238         set->set_exp = exp;
1239         set->set_oti = oti;
1240         set->set_oi = oinfo;
1241         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1242                 set->set_cookies = oti->oti_logcookies;
1243
1244         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1245                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1246                 struct lov_request *req;
1247
1248                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1249                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1250                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1251                         continue;
1252                 }
1253
1254                 OBD_ALLOC(req, sizeof(*req));
1255                 if (req == NULL)
1256                         GOTO(out_set, rc = -ENOMEM);
1257                 req->rq_stripe = i;
1258                 req->rq_idx = loi->loi_ost_idx;
1259
1260                 OBDO_ALLOC(req->rq_oi.oi_oa);
1261                 if (req->rq_oi.oi_oa == NULL) {
1262                         OBD_FREE(req, sizeof(*req));
1263                         GOTO(out_set, rc = -ENOMEM);
1264                 }
1265                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1266                        sizeof(*req->rq_oi.oi_oa));
1267                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1268                 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1269                 req->rq_oi.oi_oa->o_stripe_idx = i;
1270                 req->rq_oi.oi_cb_up = cb_setattr_update;
1271                 req->rq_oi.oi_capa = oinfo->oi_capa;
1272
1273                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1274                         int off = lov_stripe_offset(oinfo->oi_md,
1275                                                     oinfo->oi_oa->o_size, i,
1276                                                     &req->rq_oi.oi_oa->o_size);
1277
1278                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1279                                 req->rq_oi.oi_oa->o_size--;
1280
1281                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1282                                i, req->rq_oi.oi_oa->o_size,
1283                                oinfo->oi_oa->o_size);
1284                 }
1285                 lov_set_add_req(req, set);
1286         }
1287         if (!set->set_count)
1288                 GOTO(out_set, rc = -EIO);
1289         *reqset = set;
1290         RETURN(rc);
1291 out_set:
1292         lov_fini_setattr_set(set);
1293         RETURN(rc);
1294 }
1295
1296 int lov_fini_punch_set(struct lov_request_set *set)
1297 {
1298         int rc = 0;
1299         ENTRY;
1300
1301         if (set == NULL)
1302                 RETURN(0);
1303         LASSERT(set->set_exp);
1304         if (cfs_atomic_read(&set->set_completes)) {
1305                 rc = -EIO;
1306                 /* FIXME update qos data here */
1307                 if (cfs_atomic_read(&set->set_success))
1308                         rc = common_attr_done(set);
1309         }
1310
1311         lov_put_reqset(set);
1312
1313         RETURN(rc);
1314 }
1315
1316 int lov_update_punch_set(struct lov_request_set *set,
1317                          struct lov_request *req, int rc)
1318 {
1319         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1320         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1321         ENTRY;
1322
1323         lov_update_set(set, req, rc);
1324
1325         /* grace error on inactive ost */
1326         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1327                 rc = 0;
1328
1329         if (rc == 0) {
1330                 lov_stripe_lock(lsm);
1331                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1332                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1333                                 req->rq_oi.oi_oa->o_blocks;
1334                 }
1335
1336                 /* Do we need to update lvb_size here? It needn't because
1337                  * it have been done in ll_truncate(). -jay */
1338                 lov_stripe_unlock(lsm);
1339         }
1340
1341         RETURN(rc);
1342 }
1343
1344 /* The callback for osc_punch that finilizes a request info when a response
1345  * is received. */
1346 static int cb_update_punch(void *cookie, int rc)
1347 {
1348         struct obd_info *oinfo = cookie;
1349         struct lov_request *lovreq;
1350         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1351         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1352 }
1353
1354 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1355                        struct obd_trans_info *oti,
1356                        struct lov_request_set **reqset)
1357 {
1358         struct lov_request_set *set;
1359         struct lov_obd *lov = &exp->exp_obd->u.lov;
1360         int rc = 0, i;
1361         ENTRY;
1362
1363         OBD_ALLOC(set, sizeof(*set));
1364         if (set == NULL)
1365                 RETURN(-ENOMEM);
1366         lov_init_set(set);
1367
1368         set->set_oi = oinfo;
1369         set->set_exp = exp;
1370
1371         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1372                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1373                 struct lov_request *req;
1374                 obd_off rs, re;
1375
1376                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1377                                            oinfo->oi_policy.l_extent.start,
1378                                            oinfo->oi_policy.l_extent.end,
1379                                            &rs, &re))
1380                         continue;
1381
1382                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1383                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1384                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1385                         GOTO(out_set, rc = -EIO);
1386                 }
1387
1388                 OBD_ALLOC(req, sizeof(*req));
1389                 if (req == NULL)
1390                         GOTO(out_set, rc = -ENOMEM);
1391                 req->rq_stripe = i;
1392                 req->rq_idx = loi->loi_ost_idx;
1393
1394                 OBDO_ALLOC(req->rq_oi.oi_oa);
1395                 if (req->rq_oi.oi_oa == NULL) {
1396                         OBD_FREE(req, sizeof(*req));
1397                         GOTO(out_set, rc = -ENOMEM);
1398                 }
1399                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1400                        sizeof(*req->rq_oi.oi_oa));
1401                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1402                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1403                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1404
1405                 req->rq_oi.oi_oa->o_stripe_idx = i;
1406                 req->rq_oi.oi_cb_up = cb_update_punch;
1407
1408                 req->rq_oi.oi_policy.l_extent.start = rs;
1409                 req->rq_oi.oi_policy.l_extent.end = re;
1410                 req->rq_oi.oi_policy.l_extent.gid = -1;
1411
1412                 req->rq_oi.oi_capa = oinfo->oi_capa;
1413
1414                 lov_set_add_req(req, set);
1415         }
1416         if (!set->set_count)
1417                 GOTO(out_set, rc = -EIO);
1418         *reqset = set;
1419         RETURN(rc);
1420 out_set:
1421         lov_fini_punch_set(set);
1422         RETURN(rc);
1423 }
1424
1425 int lov_fini_sync_set(struct lov_request_set *set)
1426 {
1427         int rc = 0;
1428         ENTRY;
1429
1430         if (set == NULL)
1431                 RETURN(0);
1432         LASSERT(set->set_exp);
1433         if (cfs_atomic_read(&set->set_completes)) {
1434                 if (!cfs_atomic_read(&set->set_success))
1435                         rc = -EIO;
1436                 /* FIXME update qos data here */
1437         }
1438
1439         lov_put_reqset(set);
1440
1441         RETURN(rc);
1442 }
1443
1444 /* The callback for osc_sync that finilizes a request info when a
1445  * response is recieved. */
1446 static int cb_sync_update(void *cookie, int rc)
1447 {
1448         struct obd_info *oinfo = cookie;
1449         struct lov_request *lovreq;
1450
1451         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1452         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1453 }
1454
1455 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1456                       obd_off start, obd_off end,
1457                       struct lov_request_set **reqset)
1458 {
1459         struct lov_request_set *set;
1460         struct lov_obd *lov = &exp->exp_obd->u.lov;
1461         int rc = 0, i;
1462         ENTRY;
1463
1464         OBD_ALLOC_PTR(set);
1465         if (set == NULL)
1466                 RETURN(-ENOMEM);
1467         lov_init_set(set);
1468
1469         set->set_exp = exp;
1470         set->set_oi = oinfo;
1471
1472         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1473                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1474                 struct lov_request *req;
1475                 obd_off rs, re;
1476
1477                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1478                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1479                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1480                         continue;
1481                 }
1482
1483                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1484                                            &re))
1485                         continue;
1486
1487                 OBD_ALLOC_PTR(req);
1488                 if (req == NULL)
1489                         GOTO(out_set, rc = -ENOMEM);
1490                 req->rq_stripe = i;
1491                 req->rq_idx = loi->loi_ost_idx;
1492
1493                 OBDO_ALLOC(req->rq_oi.oi_oa);
1494                 if (req->rq_oi.oi_oa == NULL) {
1495                         OBD_FREE(req, sizeof(*req));
1496                         GOTO(out_set, rc = -ENOMEM);
1497                 }
1498                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1499                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1500                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1501                 req->rq_oi.oi_oa->o_stripe_idx = i;
1502
1503                 req->rq_oi.oi_policy.l_extent.start = rs;
1504                 req->rq_oi.oi_policy.l_extent.end = re;
1505                 req->rq_oi.oi_policy.l_extent.gid = -1;
1506                 req->rq_oi.oi_cb_up = cb_sync_update;
1507
1508                 lov_set_add_req(req, set);
1509         }
1510         if (!set->set_count)
1511                 GOTO(out_set, rc = -EIO);
1512         *reqset = set;
1513         RETURN(rc);
1514 out_set:
1515         lov_fini_sync_set(set);
1516         RETURN(rc);
1517 }
1518
1519 #define LOV_U64_MAX ((__u64)~0ULL)
1520 #define LOV_SUM_MAX(tot, add)                                           \
1521         do {                                                            \
1522                 if ((tot) + (add) < (tot))                              \
1523                         (tot) = LOV_U64_MAX;                            \
1524                 else                                                    \
1525                         (tot) += (add);                                 \
1526         } while(0)
1527
1528 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1529 {
1530         ENTRY;
1531
1532         if (success) {
1533                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1534                                                            LOV_MAGIC, 0);
1535                 if (osfs->os_files != LOV_U64_MAX)
1536                         lov_do_div64(osfs->os_files, expected_stripes);
1537                 if (osfs->os_ffree != LOV_U64_MAX)
1538                         lov_do_div64(osfs->os_ffree, expected_stripes);
1539
1540                 cfs_spin_lock(&obd->obd_osfs_lock);
1541                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1542                 obd->obd_osfs_age = cfs_time_current_64();
1543                 cfs_spin_unlock(&obd->obd_osfs_lock);
1544                 RETURN(0);
1545         }
1546
1547         RETURN(-EIO);
1548 }
1549
1550 int lov_fini_statfs_set(struct lov_request_set *set)
1551 {
1552         int rc = 0;
1553         ENTRY;
1554
1555         if (set == NULL)
1556                 RETURN(0);
1557
1558         if (cfs_atomic_read(&set->set_completes)) {
1559                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1560                                      cfs_atomic_read(&set->set_success));
1561         }
1562         lov_put_reqset(set);
1563         RETURN(rc);
1564 }
1565
1566 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1567                        int success)
1568 {
1569         int shift = 0, quit = 0;
1570         __u64 tmp;
1571
1572         if (success == 0) {
1573                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1574         } else {
1575                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1576                         /* assume all block sizes are always powers of 2 */
1577                         /* get the bits difference */
1578                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1579                         for (shift = 0; shift <= 64; ++shift) {
1580                                 if (tmp & 1) {
1581                                         if (quit)
1582                                                 break;
1583                                         else
1584                                                 quit = 1;
1585                                         shift = 0;
1586                                 }
1587                                 tmp >>= 1;
1588                         }
1589                 }
1590
1591                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1592                         osfs->os_bsize = lov_sfs->os_bsize;
1593
1594                         osfs->os_bfree  >>= shift;
1595                         osfs->os_bavail >>= shift;
1596                         osfs->os_blocks >>= shift;
1597                 } else if (shift != 0) {
1598                         lov_sfs->os_bfree  >>= shift;
1599                         lov_sfs->os_bavail >>= shift;
1600                         lov_sfs->os_blocks >>= shift;
1601                 }
1602 #ifdef MIN_DF
1603                 /* Sandia requested that df (and so, statfs) only
1604                    returned minimal available space on
1605                    a single OST, so people would be able to
1606                    write this much data guaranteed. */
1607                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1608                         /* Presumably if new bavail is smaller,
1609                            new bfree is bigger as well */
1610                         osfs->os_bfree = lov_sfs->os_bfree;
1611                         osfs->os_bavail = lov_sfs->os_bavail;
1612                 }
1613 #else
1614                 osfs->os_bfree += lov_sfs->os_bfree;
1615                 osfs->os_bavail += lov_sfs->os_bavail;
1616 #endif
1617                 osfs->os_blocks += lov_sfs->os_blocks;
1618                 /* XXX not sure about this one - depends on policy.
1619                  *   - could be minimum if we always stripe on all OBDs
1620                  *     (but that would be wrong for any other policy,
1621                  *     if one of the OBDs has no more objects left)
1622                  *   - could be sum if we stripe whole objects
1623                  *   - could be average, just to give a nice number
1624                  *
1625                  * To give a "reasonable" (if not wholly accurate)
1626                  * number, we divide the total number of free objects
1627                  * by expected stripe count (watch out for overflow).
1628                  */
1629                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1630                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1631         }
1632 }
1633
1634 /* The callback for osc_statfs_async that finilizes a request info when a
1635  * response is received. */
1636 static int cb_statfs_update(void *cookie, int rc)
1637 {
1638         struct obd_info *oinfo = cookie;
1639         struct lov_request *lovreq;
1640         struct lov_request_set *set;
1641         struct obd_statfs *osfs, *lov_sfs;
1642         struct lov_obd *lov;
1643         struct lov_tgt_desc *tgt;
1644         struct obd_device *lovobd, *tgtobd;
1645         int success;
1646         ENTRY;
1647
1648         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1649         set = lovreq->rq_rqset;
1650         lovobd = set->set_obd;
1651         lov = &lovobd->u.lov;
1652         osfs = set->set_oi->oi_osfs;
1653         lov_sfs = oinfo->oi_osfs;
1654         success = cfs_atomic_read(&set->set_success);
1655         /* XXX: the same is done in lov_update_common_set, however
1656            lovset->set_exp is not initialized. */
1657         lov_update_set(set, lovreq, rc);
1658         if (rc)
1659                 GOTO(out, rc);
1660
1661         obd_getref(lovobd);
1662         tgt = lov->lov_tgts[lovreq->rq_idx];
1663         if (!tgt || !tgt->ltd_active)
1664                 GOTO(out_update, rc);
1665
1666         tgtobd = class_exp2obd(tgt->ltd_exp);
1667         cfs_spin_lock(&tgtobd->obd_osfs_lock);
1668         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1669         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1670                 tgtobd->obd_osfs_age = cfs_time_current_64();
1671         cfs_spin_unlock(&tgtobd->obd_osfs_lock);
1672
1673 out_update:
1674         lov_update_statfs(osfs, lov_sfs, success);
1675         qos_update(lov);
1676         obd_putref(lovobd);
1677
1678 out:
1679         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1680             lov_set_finished(set, 0)) {
1681                 lov_statfs_interpret(NULL, set, set->set_count !=
1682                                      cfs_atomic_read(&set->set_success));
1683                 if (lov->lov_qos.lq_statfs_in_progress)
1684                         qos_statfs_done(lov);
1685         }
1686
1687         RETURN(0);
1688 }
1689
1690 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1691                         struct lov_request_set **reqset)
1692 {
1693         struct lov_request_set *set;
1694         struct lov_obd *lov = &obd->u.lov;
1695         int rc = 0, i;
1696         ENTRY;
1697
1698         OBD_ALLOC(set, sizeof(*set));
1699         if (set == NULL)
1700                 RETURN(-ENOMEM);
1701         lov_init_set(set);
1702
1703         set->set_obd = obd;
1704         set->set_oi = oinfo;
1705
1706         /* We only get block data from the OBD */
1707         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1708                 struct lov_request *req;
1709
1710                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1711                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1712                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1713                         continue;
1714                 }
1715
1716                 /* skip targets that have been explicitely disabled by the
1717                  * administrator */
1718                 if (!lov->lov_tgts[i]->ltd_exp) {
1719                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1720                         continue;
1721                 }
1722
1723                 OBD_ALLOC(req, sizeof(*req));
1724                 if (req == NULL)
1725                         GOTO(out_set, rc = -ENOMEM);
1726
1727                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1728                 if (req->rq_oi.oi_osfs == NULL) {
1729                         OBD_FREE(req, sizeof(*req));
1730                         GOTO(out_set, rc = -ENOMEM);
1731                 }
1732
1733                 req->rq_idx = i;
1734                 req->rq_oi.oi_cb_up = cb_statfs_update;
1735                 req->rq_oi.oi_flags = oinfo->oi_flags;
1736
1737                 lov_set_add_req(req, set);
1738         }
1739         if (!set->set_count)
1740                 GOTO(out_set, rc = -EIO);
1741         *reqset = set;
1742         RETURN(rc);
1743 out_set:
1744         lov_fini_statfs_set(set);
1745         RETURN(rc);
1746 }