Whamcloud - gitweb
LU-2682 fid: cleanup direct _id and _seq access
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOV
38
39 #ifdef __KERNEL__
40 #include <libcfs/libcfs.h>
41 #else
42 #include <liblustre.h>
43 #endif
44
45 #include <obd_class.h>
46 #include <obd_lov.h>
47 #include <lustre/lustre_idl.h>
48
49 #include "lov_internal.h"
50
51 static void lov_init_set(struct lov_request_set *set)
52 {
53         set->set_count = 0;
54         cfs_atomic_set(&set->set_completes, 0);
55         cfs_atomic_set(&set->set_success, 0);
56         cfs_atomic_set(&set->set_finish_checked, 0);
57         set->set_cookies = 0;
58         CFS_INIT_LIST_HEAD(&set->set_list);
59         cfs_atomic_set(&set->set_refcount, 1);
60         cfs_waitq_init(&set->set_waitq);
61         spin_lock_init(&set->set_lock);
62 }
63
64 void lov_finish_set(struct lov_request_set *set)
65 {
66         cfs_list_t *pos, *n;
67         ENTRY;
68
69         LASSERT(set);
70         cfs_list_for_each_safe(pos, n, &set->set_list) {
71                 struct lov_request *req = cfs_list_entry(pos,
72                                                          struct lov_request,
73                                                          rq_link);
74                 cfs_list_del_init(&req->rq_link);
75
76                 if (req->rq_oi.oi_oa)
77                         OBDO_FREE(req->rq_oi.oi_oa);
78                 if (req->rq_oi.oi_md)
79                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
80                 if (req->rq_oi.oi_osfs)
81                         OBD_FREE(req->rq_oi.oi_osfs,
82                                  sizeof(*req->rq_oi.oi_osfs));
83                 OBD_FREE(req, sizeof(*req));
84         }
85
86         if (set->set_pga) {
87                 int len = set->set_oabufs * sizeof(*set->set_pga);
88                 OBD_FREE_LARGE(set->set_pga, len);
89         }
90         if (set->set_lockh)
91                 lov_llh_put(set->set_lockh);
92
93         OBD_FREE(set, sizeof(*set));
94         EXIT;
95 }
96
97 int lov_set_finished(struct lov_request_set *set, int idempotent)
98 {
99         int completes = cfs_atomic_read(&set->set_completes);
100
101         CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
102
103         if (completes == set->set_count) {
104                 if (idempotent)
105                         return 1;
106                 if (cfs_atomic_inc_return(&set->set_finish_checked) == 1)
107                         return 1;
108         }
109         return 0;
110 }
111
112 void lov_update_set(struct lov_request_set *set,
113                     struct lov_request *req, int rc)
114 {
115         req->rq_complete = 1;
116         req->rq_rc = rc;
117
118         cfs_atomic_inc(&set->set_completes);
119         if (rc == 0)
120                 cfs_atomic_inc(&set->set_success);
121
122         cfs_waitq_signal(&set->set_waitq);
123 }
124
125 int lov_update_common_set(struct lov_request_set *set,
126                           struct lov_request *req, int rc)
127 {
128         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
129         ENTRY;
130
131         lov_update_set(set, req, rc);
132
133         /* grace error on inactive ost */
134         if (rc && !(lov->lov_tgts[req->rq_idx] &&
135                     lov->lov_tgts[req->rq_idx]->ltd_active))
136                 rc = 0;
137
138         /* FIXME in raid1 regime, should return 0 */
139         RETURN(rc);
140 }
141
142 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
143 {
144         cfs_list_add_tail(&req->rq_link, &set->set_list);
145         set->set_count++;
146         req->rq_rqset = set;
147 }
148
149 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
150                                struct lov_oinfo *loi, int flags,
151                                struct ost_lvb *lvb, __u32 mode, int rc);
152
153 static int lov_update_enqueue_lov(struct obd_export *exp,
154                                   struct lustre_handle *lov_lockhp,
155                                   struct lov_oinfo *loi, int flags, int idx,
156                                   struct ost_id *oi, int rc)
157 {
158         struct lov_obd *lov = &exp->exp_obd->u.lov;
159
160         if (rc != ELDLM_OK &&
161             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
162                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
163                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
164                         /* -EUSERS used by OST to report file contention */
165                         if (rc != -EINTR && rc != -EUSERS)
166                                 CERROR("%s: enqueue objid "DOSTID" subobj"
167                                        DOSTID" on OST idx %d: rc %d\n",
168                                        exp->exp_obd->obd_name,
169                                        POSTID(oi), POSTID(&loi->loi_oi),
170                                        loi->loi_ost_idx, rc);
171                 } else
172                         rc = ELDLM_OK;
173         }
174         return rc;
175 }
176
177 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
178 {
179         struct lov_request_set *set = req->rq_rqset;
180         struct lustre_handle *lov_lockhp;
181         struct obd_info *oi = set->set_oi;
182         struct lov_oinfo *loi;
183         ENTRY;
184
185         LASSERT(oi != NULL);
186
187         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
188         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
189
190         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
191          * and that copy can be arbitrarily out of date.
192          *
193          * The LOV API is due for a serious rewriting anyways, and this
194          * can be addressed then. */
195
196         lov_stripe_lock(oi->oi_md);
197         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
198                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
199         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
200                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
201         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
202                                     req->rq_idx, &oi->oi_md->lsm_oi, rc);
203         lov_stripe_unlock(oi->oi_md);
204         lov_update_set(set, req, rc);
205         RETURN(rc);
206 }
207
208 /* The callback for osc_enqueue that updates lov info for every OSC request. */
209 static int cb_update_enqueue(void *cookie, int rc)
210 {
211         struct obd_info *oinfo = cookie;
212         struct ldlm_enqueue_info *einfo;
213         struct lov_request *lovreq;
214
215         lovreq = container_of(oinfo, struct lov_request, rq_oi);
216         einfo = lovreq->rq_rqset->set_ei;
217         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
218 }
219
220 static int enqueue_done(struct lov_request_set *set, __u32 mode)
221 {
222         struct lov_request *req;
223         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
224         int completes = cfs_atomic_read(&set->set_completes);
225         int rc = 0;
226         ENTRY;
227
228         /* enqueue/match success, just return */
229         if (completes && completes == cfs_atomic_read(&set->set_success))
230                 RETURN(0);
231
232         /* cancel enqueued/matched locks */
233         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
234                 struct lustre_handle *lov_lockhp;
235
236                 if (!req->rq_complete || req->rq_rc)
237                         continue;
238
239                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
240                 LASSERT(lov_lockhp);
241                 if (!lustre_handle_is_used(lov_lockhp))
242                         continue;
243
244                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
245                                 req->rq_oi.oi_md, mode, lov_lockhp);
246                 if (rc && lov->lov_tgts[req->rq_idx] &&
247                     lov->lov_tgts[req->rq_idx]->ltd_active)
248                         CERROR("%s: cancelling obdjid "DOSTID" on OST"
249                                "idx %d error: rc = %d\n",
250                                set->set_exp->exp_obd->obd_name,
251                                POSTID(&req->rq_oi.oi_md->lsm_oi),
252                                req->rq_idx, rc);
253         }
254         if (set->set_lockh)
255                 lov_llh_put(set->set_lockh);
256         RETURN(rc);
257 }
258
259 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
260                          struct ptlrpc_request_set *rqset)
261 {
262         int ret = 0;
263         ENTRY;
264
265         if (set == NULL)
266                 RETURN(0);
267         LASSERT(set->set_exp);
268         /* Do enqueue_done only for sync requests and if any request
269          * succeeded. */
270         if (!rqset) {
271                 if (rc)
272                         cfs_atomic_set(&set->set_completes, 0);
273                 ret = enqueue_done(set, mode);
274         } else if (set->set_lockh)
275                 lov_llh_put(set->set_lockh);
276
277         lov_put_reqset(set);
278
279         RETURN(rc ? rc : ret);
280 }
281
282 static void lov_llh_addref(void *llhp)
283 {
284         struct lov_lock_handles *llh = llhp;
285
286         cfs_atomic_inc(&llh->llh_refcount);
287         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
288                cfs_atomic_read(&llh->llh_refcount));
289 }
290
291 static struct portals_handle_ops lov_handle_ops = {
292         .hop_addref = lov_llh_addref,
293         .hop_free   = NULL,
294 };
295
296 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
297 {
298         struct lov_lock_handles *llh;
299
300         OBD_ALLOC(llh, sizeof *llh +
301                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
302         if (llh == NULL)
303                 return NULL;
304
305         cfs_atomic_set(&llh->llh_refcount, 2);
306         llh->llh_stripe_count = lsm->lsm_stripe_count;
307         CFS_INIT_LIST_HEAD(&llh->llh_handle.h_link);
308         class_handle_hash(&llh->llh_handle, &lov_handle_ops);
309
310         return llh;
311 }
312
313 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
314                          struct ldlm_enqueue_info *einfo,
315                          struct lov_request_set **reqset)
316 {
317         struct lov_obd *lov = &exp->exp_obd->u.lov;
318         struct lov_request_set *set;
319         int i, rc = 0;
320         ENTRY;
321
322         OBD_ALLOC(set, sizeof(*set));
323         if (set == NULL)
324                 RETURN(-ENOMEM);
325         lov_init_set(set);
326
327         set->set_exp = exp;
328         set->set_oi = oinfo;
329         set->set_ei = einfo;
330         set->set_lockh = lov_llh_new(oinfo->oi_md);
331         if (set->set_lockh == NULL)
332                 GOTO(out_set, rc = -ENOMEM);
333         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
334
335         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
336                 struct lov_oinfo *loi;
337                 struct lov_request *req;
338                 obd_off start, end;
339
340                 loi = oinfo->oi_md->lsm_oinfo[i];
341                 if (!lov_stripe_intersects(oinfo->oi_md, i,
342                                            oinfo->oi_policy.l_extent.start,
343                                            oinfo->oi_policy.l_extent.end,
344                                            &start, &end))
345                         continue;
346
347                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
348                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
349                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
350                         continue;
351                 }
352
353                 OBD_ALLOC(req, sizeof(*req));
354                 if (req == NULL)
355                         GOTO(out_set, rc = -ENOMEM);
356
357                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
358                         sizeof(struct lov_oinfo *) +
359                         sizeof(struct lov_oinfo);
360                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
361                 if (req->rq_oi.oi_md == NULL) {
362                         OBD_FREE(req, sizeof(*req));
363                         GOTO(out_set, rc = -ENOMEM);
364                 }
365                 req->rq_oi.oi_md->lsm_oinfo[0] =
366                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
367                         sizeof(struct lov_oinfo *);
368
369                 /* Set lov request specific parameters. */
370                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
371                 req->rq_oi.oi_cb_up = cb_update_enqueue;
372                 req->rq_oi.oi_flags = oinfo->oi_flags;
373
374                 LASSERT(req->rq_oi.oi_lockh);
375
376                 req->rq_oi.oi_policy.l_extent.gid =
377                         oinfo->oi_policy.l_extent.gid;
378                 req->rq_oi.oi_policy.l_extent.start = start;
379                 req->rq_oi.oi_policy.l_extent.end = end;
380
381                 req->rq_idx = loi->loi_ost_idx;
382                 req->rq_stripe = i;
383
384                 /* XXX LOV STACKING: submd should be from the subobj */
385                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
386                 req->rq_oi.oi_md->lsm_stripe_count = 0;
387                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
388                         loi->loi_kms_valid;
389                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
390                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
391
392                 lov_set_add_req(req, set);
393         }
394         if (!set->set_count)
395                 GOTO(out_set, rc = -EIO);
396         *reqset = set;
397         RETURN(0);
398 out_set:
399         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
400         RETURN(rc);
401 }
402
403 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
404 {
405         int rc = 0;
406         ENTRY;
407
408         if (set == NULL)
409                 RETURN(0);
410         LASSERT(set->set_exp);
411         rc = enqueue_done(set, mode);
412         if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
413             (flags & LDLM_FL_TEST_LOCK))
414                 lov_llh_put(set->set_lockh);
415
416         lov_put_reqset(set);
417
418         RETURN(rc);
419 }
420
421 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
422                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
423                        __u32 mode, struct lustre_handle *lockh,
424                        struct lov_request_set **reqset)
425 {
426         struct lov_obd *lov = &exp->exp_obd->u.lov;
427         struct lov_request_set *set;
428         int i, rc = 0;
429         ENTRY;
430
431         OBD_ALLOC(set, sizeof(*set));
432         if (set == NULL)
433                 RETURN(-ENOMEM);
434         lov_init_set(set);
435
436         set->set_exp = exp;
437         set->set_oi = oinfo;
438         set->set_oi->oi_md = lsm;
439         set->set_lockh = lov_llh_new(lsm);
440         if (set->set_lockh == NULL)
441                 GOTO(out_set, rc = -ENOMEM);
442         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
443
444         for (i = 0; i < lsm->lsm_stripe_count; i++){
445                 struct lov_oinfo *loi;
446                 struct lov_request *req;
447                 obd_off start, end;
448
449                 loi = lsm->lsm_oinfo[i];
450                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
451                                            policy->l_extent.end, &start, &end))
452                         continue;
453
454                 /* FIXME raid1 should grace this error */
455                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
456                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
457                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
458                         GOTO(out_set, rc = -EIO);
459                 }
460
461                 OBD_ALLOC(req, sizeof(*req));
462                 if (req == NULL)
463                         GOTO(out_set, rc = -ENOMEM);
464
465                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
466                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
467                 if (req->rq_oi.oi_md == NULL) {
468                         OBD_FREE(req, sizeof(*req));
469                         GOTO(out_set, rc = -ENOMEM);
470                 }
471
472                 req->rq_oi.oi_policy.l_extent.start = start;
473                 req->rq_oi.oi_policy.l_extent.end = end;
474                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
475
476                 req->rq_idx = loi->loi_ost_idx;
477                 req->rq_stripe = i;
478
479                 /* XXX LOV STACKING: submd should be from the subobj */
480                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
481                 req->rq_oi.oi_md->lsm_stripe_count = 0;
482
483                 lov_set_add_req(req, set);
484         }
485         if (!set->set_count)
486                 GOTO(out_set, rc = -EIO);
487         *reqset = set;
488         RETURN(rc);
489 out_set:
490         lov_fini_match_set(set, mode, 0);
491         RETURN(rc);
492 }
493
494 int lov_fini_cancel_set(struct lov_request_set *set)
495 {
496         int rc = 0;
497         ENTRY;
498
499         if (set == NULL)
500                 RETURN(0);
501
502         LASSERT(set->set_exp);
503         if (set->set_lockh)
504                 lov_llh_put(set->set_lockh);
505
506         lov_put_reqset(set);
507
508         RETURN(rc);
509 }
510
511 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
512                         struct lov_stripe_md *lsm, __u32 mode,
513                         struct lustre_handle *lockh,
514                         struct lov_request_set **reqset)
515 {
516         struct lov_request_set *set;
517         int i, rc = 0;
518         ENTRY;
519
520         OBD_ALLOC(set, sizeof(*set));
521         if (set == NULL)
522                 RETURN(-ENOMEM);
523         lov_init_set(set);
524
525         set->set_exp = exp;
526         set->set_oi = oinfo;
527         set->set_oi->oi_md = lsm;
528         set->set_lockh = lov_handle2llh(lockh);
529         if (set->set_lockh == NULL) {
530                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
531                 GOTO(out_set, rc = -EINVAL);
532         }
533         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
534
535         for (i = 0; i < lsm->lsm_stripe_count; i++){
536                 struct lov_request *req;
537                 struct lustre_handle *lov_lockhp;
538                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
539
540                 lov_lockhp = set->set_lockh->llh_handles + i;
541                 if (!lustre_handle_is_used(lov_lockhp)) {
542                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
543                                loi->loi_ost_idx, loi->loi_id);
544                         continue;
545                 }
546
547                 OBD_ALLOC(req, sizeof(*req));
548                 if (req == NULL)
549                         GOTO(out_set, rc = -ENOMEM);
550
551                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
552                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
553                 if (req->rq_oi.oi_md == NULL) {
554                         OBD_FREE(req, sizeof(*req));
555                         GOTO(out_set, rc = -ENOMEM);
556                 }
557
558                 req->rq_idx = loi->loi_ost_idx;
559                 req->rq_stripe = i;
560
561                 /* XXX LOV STACKING: submd should be from the subobj */
562                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
563                 req->rq_oi.oi_md->lsm_stripe_count = 0;
564
565                 lov_set_add_req(req, set);
566         }
567         if (!set->set_count)
568                 GOTO(out_set, rc = -EIO);
569         *reqset = set;
570         RETURN(rc);
571 out_set:
572         lov_fini_cancel_set(set);
573         RETURN(rc);
574 }
575 static int common_attr_done(struct lov_request_set *set)
576 {
577         cfs_list_t *pos;
578         struct lov_request *req;
579         struct obdo *tmp_oa;
580         int rc = 0, attrset = 0;
581         ENTRY;
582
583         LASSERT(set->set_oi != NULL);
584
585         if (set->set_oi->oi_oa == NULL)
586                 RETURN(0);
587
588         if (!cfs_atomic_read(&set->set_success))
589                 RETURN(-EIO);
590
591         OBDO_ALLOC(tmp_oa);
592         if (tmp_oa == NULL)
593                 GOTO(out, rc = -ENOMEM);
594
595         cfs_list_for_each (pos, &set->set_list) {
596                 req = cfs_list_entry(pos, struct lov_request, rq_link);
597
598                 if (!req->rq_complete || req->rq_rc)
599                         continue;
600                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
601                         continue;
602                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
603                                 req->rq_oi.oi_oa->o_valid,
604                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
605         }
606         if (!attrset) {
607                 CERROR("No stripes had valid attrs\n");
608                 rc = -EIO;
609         }
610         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
611             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
612                 /* When we take attributes of some epoch, we require all the
613                  * ost to be active. */
614                 CERROR("Not all the stripes had valid attrs\n");
615                 GOTO(out, rc = -EIO);
616         }
617
618         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
619         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
620 out:
621         if (tmp_oa)
622                 OBDO_FREE(tmp_oa);
623         RETURN(rc);
624
625 }
626
627 static int brw_done(struct lov_request_set *set)
628 {
629         struct lov_stripe_md *lsm = set->set_oi->oi_md;
630         struct lov_oinfo     *loi = NULL;
631         cfs_list_t *pos;
632         struct lov_request *req;
633         ENTRY;
634
635         cfs_list_for_each (pos, &set->set_list) {
636                 req = cfs_list_entry(pos, struct lov_request, rq_link);
637
638                 if (!req->rq_complete || req->rq_rc)
639                         continue;
640
641                 loi = lsm->lsm_oinfo[req->rq_stripe];
642
643                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
644                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
645         }
646
647         RETURN(0);
648 }
649
650 int lov_fini_brw_set(struct lov_request_set *set)
651 {
652         int rc = 0;
653         ENTRY;
654
655         if (set == NULL)
656                 RETURN(0);
657         LASSERT(set->set_exp);
658         if (cfs_atomic_read(&set->set_completes)) {
659                 rc = brw_done(set);
660                 /* FIXME update qos data here */
661         }
662         lov_put_reqset(set);
663
664         RETURN(rc);
665 }
666
667 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
668                      obd_count oa_bufs, struct brw_page *pga,
669                      struct obd_trans_info *oti,
670                      struct lov_request_set **reqset)
671 {
672         struct {
673                 obd_count       index;
674                 obd_count       count;
675                 obd_count       off;
676         } *info = NULL;
677         struct lov_request_set *set;
678         struct lov_obd *lov = &exp->exp_obd->u.lov;
679         int rc = 0, i, shift;
680         ENTRY;
681
682         OBD_ALLOC(set, sizeof(*set));
683         if (set == NULL)
684                 RETURN(-ENOMEM);
685         lov_init_set(set);
686
687         set->set_exp = exp;
688         set->set_oti = oti;
689         set->set_oi = oinfo;
690         set->set_oabufs = oa_bufs;
691         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
692         if (!set->set_pga)
693                 GOTO(out, rc = -ENOMEM);
694
695         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
696         if (!info)
697                 GOTO(out, rc = -ENOMEM);
698
699         /* calculate the page count for each stripe */
700         for (i = 0; i < oa_bufs; i++) {
701                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
702                 info[stripe].count++;
703         }
704
705         /* alloc and initialize lov request */
706         shift = 0;
707         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
708                 struct lov_oinfo *loi = NULL;
709                 struct lov_request *req;
710
711                 if (info[i].count == 0)
712                         continue;
713
714                 loi = oinfo->oi_md->lsm_oinfo[i];
715                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
716                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
717                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
718                         GOTO(out, rc = -EIO);
719                 }
720
721                 OBD_ALLOC(req, sizeof(*req));
722                 if (req == NULL)
723                         GOTO(out, rc = -ENOMEM);
724
725                 OBDO_ALLOC(req->rq_oi.oi_oa);
726                 if (req->rq_oi.oi_oa == NULL) {
727                         OBD_FREE(req, sizeof(*req));
728                         GOTO(out, rc = -ENOMEM);
729                 }
730
731                 if (oinfo->oi_oa) {
732                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
733                                sizeof(*req->rq_oi.oi_oa));
734                 }
735                 req->rq_oi.oi_oa->o_id = loi->loi_id;
736                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
737                 req->rq_oi.oi_oa->o_stripe_idx = i;
738
739                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
740                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
741                 if (req->rq_oi.oi_md == NULL) {
742                         OBDO_FREE(req->rq_oi.oi_oa);
743                         OBD_FREE(req, sizeof(*req));
744                         GOTO(out, rc = -ENOMEM);
745                 }
746
747                 req->rq_idx = loi->loi_ost_idx;
748                 req->rq_stripe = i;
749
750                 /* XXX LOV STACKING */
751                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
752                 req->rq_oabufs = info[i].count;
753                 req->rq_pgaidx = shift;
754                 shift += req->rq_oabufs;
755
756                 /* remember the index for sort brw_page array */
757                 info[i].index = req->rq_pgaidx;
758
759                 req->rq_oi.oi_capa = oinfo->oi_capa;
760
761                 lov_set_add_req(req, set);
762         }
763         if (!set->set_count)
764                 GOTO(out, rc = -EIO);
765
766         /* rotate & sort the brw_page array */
767         for (i = 0; i < oa_bufs; i++) {
768                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
769
770                 shift = info[stripe].index + info[stripe].off;
771                 LASSERT(shift < oa_bufs);
772                 set->set_pga[shift] = pga[i];
773                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
774                                   &set->set_pga[shift].off);
775                 info[stripe].off++;
776         }
777 out:
778         if (info)
779                 OBD_FREE_LARGE(info,
780                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
781
782         if (rc == 0)
783                 *reqset = set;
784         else
785                 lov_fini_brw_set(set);
786
787         RETURN(rc);
788 }
789
790 int lov_fini_getattr_set(struct lov_request_set *set)
791 {
792         int rc = 0;
793         ENTRY;
794
795         if (set == NULL)
796                 RETURN(0);
797         LASSERT(set->set_exp);
798         if (cfs_atomic_read(&set->set_completes))
799                 rc = common_attr_done(set);
800
801         lov_put_reqset(set);
802
803         RETURN(rc);
804 }
805
806 /* The callback for osc_getattr_async that finilizes a request info when a
807  * response is received. */
808 static int cb_getattr_update(void *cookie, int rc)
809 {
810         struct obd_info *oinfo = cookie;
811         struct lov_request *lovreq;
812         lovreq = container_of(oinfo, struct lov_request, rq_oi);
813         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
814 }
815
816 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
817                          struct lov_request_set **reqset)
818 {
819         struct lov_request_set *set;
820         struct lov_obd *lov = &exp->exp_obd->u.lov;
821         int rc = 0, i;
822         ENTRY;
823
824         OBD_ALLOC(set, sizeof(*set));
825         if (set == NULL)
826                 RETURN(-ENOMEM);
827         lov_init_set(set);
828
829         set->set_exp = exp;
830         set->set_oi = oinfo;
831
832         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
833                 struct lov_oinfo *loi;
834                 struct lov_request *req;
835
836                 loi = oinfo->oi_md->lsm_oinfo[i];
837                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
838                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
839                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
840                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
841                                 /* SOM requires all the OSTs to be active. */
842                                 GOTO(out_set, rc = -EIO);
843                         continue;
844                 }
845
846                 OBD_ALLOC(req, sizeof(*req));
847                 if (req == NULL)
848                         GOTO(out_set, rc = -ENOMEM);
849
850                 req->rq_stripe = i;
851                 req->rq_idx = loi->loi_ost_idx;
852
853                 OBDO_ALLOC(req->rq_oi.oi_oa);
854                 if (req->rq_oi.oi_oa == NULL) {
855                         OBD_FREE(req, sizeof(*req));
856                         GOTO(out_set, rc = -ENOMEM);
857                 }
858                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
859                        sizeof(*req->rq_oi.oi_oa));
860                 req->rq_oi.oi_oa->o_id = loi->loi_id;
861                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
862                 req->rq_oi.oi_cb_up = cb_getattr_update;
863                 req->rq_oi.oi_capa = oinfo->oi_capa;
864
865                 lov_set_add_req(req, set);
866         }
867         if (!set->set_count)
868                 GOTO(out_set, rc = -EIO);
869         *reqset = set;
870         RETURN(rc);
871 out_set:
872         lov_fini_getattr_set(set);
873         RETURN(rc);
874 }
875
876 int lov_fini_destroy_set(struct lov_request_set *set)
877 {
878         ENTRY;
879
880         if (set == NULL)
881                 RETURN(0);
882         LASSERT(set->set_exp);
883         if (cfs_atomic_read(&set->set_completes)) {
884                 /* FIXME update qos data here */
885         }
886
887         lov_put_reqset(set);
888
889         RETURN(0);
890 }
891
892 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
893                          struct obdo *src_oa, struct lov_stripe_md *lsm,
894                          struct obd_trans_info *oti,
895                          struct lov_request_set **reqset)
896 {
897         struct lov_request_set *set;
898         struct lov_obd *lov = &exp->exp_obd->u.lov;
899         int rc = 0, i;
900         ENTRY;
901
902         OBD_ALLOC(set, sizeof(*set));
903         if (set == NULL)
904                 RETURN(-ENOMEM);
905         lov_init_set(set);
906
907         set->set_exp = exp;
908         set->set_oi = oinfo;
909         set->set_oi->oi_md = lsm;
910         set->set_oi->oi_oa = src_oa;
911         set->set_oti = oti;
912         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
913                 set->set_cookies = oti->oti_logcookies;
914
915         for (i = 0; i < lsm->lsm_stripe_count; i++) {
916                 struct lov_oinfo *loi;
917                 struct lov_request *req;
918
919                 loi = lsm->lsm_oinfo[i];
920                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
921                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
922                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
923                         continue;
924                 }
925
926                 OBD_ALLOC(req, sizeof(*req));
927                 if (req == NULL)
928                         GOTO(out_set, rc = -ENOMEM);
929
930                 req->rq_stripe = i;
931                 req->rq_idx = loi->loi_ost_idx;
932
933                 OBDO_ALLOC(req->rq_oi.oi_oa);
934                 if (req->rq_oi.oi_oa == NULL) {
935                         OBD_FREE(req, sizeof(*req));
936                         GOTO(out_set, rc = -ENOMEM);
937                 }
938                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
939                 req->rq_oi.oi_oa->o_id = loi->loi_id;
940                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
941                 lov_set_add_req(req, set);
942         }
943         if (!set->set_count)
944                 GOTO(out_set, rc = -EIO);
945         *reqset = set;
946         RETURN(rc);
947 out_set:
948         lov_fini_destroy_set(set);
949         RETURN(rc);
950 }
951
952 int lov_fini_setattr_set(struct lov_request_set *set)
953 {
954         int rc = 0;
955         ENTRY;
956
957         if (set == NULL)
958                 RETURN(0);
959         LASSERT(set->set_exp);
960         if (cfs_atomic_read(&set->set_completes)) {
961                 rc = common_attr_done(set);
962                 /* FIXME update qos data here */
963         }
964
965         lov_put_reqset(set);
966         RETURN(rc);
967 }
968
969 int lov_update_setattr_set(struct lov_request_set *set,
970                            struct lov_request *req, int rc)
971 {
972         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
973         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
974         ENTRY;
975
976         lov_update_set(set, req, rc);
977
978         /* grace error on inactive ost */
979         if (rc && !(lov->lov_tgts[req->rq_idx] &&
980                     lov->lov_tgts[req->rq_idx]->ltd_active))
981                 rc = 0;
982
983         if (rc == 0) {
984                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
985                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
986                                 req->rq_oi.oi_oa->o_ctime;
987                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
988                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
989                                 req->rq_oi.oi_oa->o_mtime;
990                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
991                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
992                                 req->rq_oi.oi_oa->o_atime;
993         }
994
995         RETURN(rc);
996 }
997
998 /* The callback for osc_setattr_async that finilizes a request info when a
999  * response is received. */
1000 static int cb_setattr_update(void *cookie, int rc)
1001 {
1002         struct obd_info *oinfo = cookie;
1003         struct lov_request *lovreq;
1004         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1005         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1006 }
1007
1008 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1009                          struct obd_trans_info *oti,
1010                          struct lov_request_set **reqset)
1011 {
1012         struct lov_request_set *set;
1013         struct lov_obd *lov = &exp->exp_obd->u.lov;
1014         int rc = 0, i;
1015         ENTRY;
1016
1017         OBD_ALLOC(set, sizeof(*set));
1018         if (set == NULL)
1019                 RETURN(-ENOMEM);
1020         lov_init_set(set);
1021
1022         set->set_exp = exp;
1023         set->set_oti = oti;
1024         set->set_oi = oinfo;
1025         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1026                 set->set_cookies = oti->oti_logcookies;
1027
1028         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1029                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1030                 struct lov_request *req;
1031
1032                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1033                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1034                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1035                         continue;
1036                 }
1037
1038                 OBD_ALLOC(req, sizeof(*req));
1039                 if (req == NULL)
1040                         GOTO(out_set, rc = -ENOMEM);
1041                 req->rq_stripe = i;
1042                 req->rq_idx = loi->loi_ost_idx;
1043
1044                 OBDO_ALLOC(req->rq_oi.oi_oa);
1045                 if (req->rq_oi.oi_oa == NULL) {
1046                         OBD_FREE(req, sizeof(*req));
1047                         GOTO(out_set, rc = -ENOMEM);
1048                 }
1049                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1050                        sizeof(*req->rq_oi.oi_oa));
1051                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1052                 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1053                 req->rq_oi.oi_oa->o_stripe_idx = i;
1054                 req->rq_oi.oi_cb_up = cb_setattr_update;
1055                 req->rq_oi.oi_capa = oinfo->oi_capa;
1056
1057                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1058                         int off = lov_stripe_offset(oinfo->oi_md,
1059                                                     oinfo->oi_oa->o_size, i,
1060                                                     &req->rq_oi.oi_oa->o_size);
1061
1062                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1063                                 req->rq_oi.oi_oa->o_size--;
1064
1065                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1066                                i, req->rq_oi.oi_oa->o_size,
1067                                oinfo->oi_oa->o_size);
1068                 }
1069                 lov_set_add_req(req, set);
1070         }
1071         if (!set->set_count)
1072                 GOTO(out_set, rc = -EIO);
1073         *reqset = set;
1074         RETURN(rc);
1075 out_set:
1076         lov_fini_setattr_set(set);
1077         RETURN(rc);
1078 }
1079
1080 int lov_fini_punch_set(struct lov_request_set *set)
1081 {
1082         int rc = 0;
1083         ENTRY;
1084
1085         if (set == NULL)
1086                 RETURN(0);
1087         LASSERT(set->set_exp);
1088         if (cfs_atomic_read(&set->set_completes)) {
1089                 rc = -EIO;
1090                 /* FIXME update qos data here */
1091                 if (cfs_atomic_read(&set->set_success))
1092                         rc = common_attr_done(set);
1093         }
1094
1095         lov_put_reqset(set);
1096
1097         RETURN(rc);
1098 }
1099
1100 int lov_update_punch_set(struct lov_request_set *set,
1101                          struct lov_request *req, int rc)
1102 {
1103         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1104         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1105         ENTRY;
1106
1107         lov_update_set(set, req, rc);
1108
1109         /* grace error on inactive ost */
1110         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1111                 rc = 0;
1112
1113         if (rc == 0) {
1114                 lov_stripe_lock(lsm);
1115                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1116                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1117                                 req->rq_oi.oi_oa->o_blocks;
1118                 }
1119
1120                 lov_stripe_unlock(lsm);
1121         }
1122
1123         RETURN(rc);
1124 }
1125
1126 /* The callback for osc_punch that finilizes a request info when a response
1127  * is received. */
1128 static int cb_update_punch(void *cookie, int rc)
1129 {
1130         struct obd_info *oinfo = cookie;
1131         struct lov_request *lovreq;
1132         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1133         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1134 }
1135
1136 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1137                        struct obd_trans_info *oti,
1138                        struct lov_request_set **reqset)
1139 {
1140         struct lov_request_set *set;
1141         struct lov_obd *lov = &exp->exp_obd->u.lov;
1142         int rc = 0, i;
1143         ENTRY;
1144
1145         OBD_ALLOC(set, sizeof(*set));
1146         if (set == NULL)
1147                 RETURN(-ENOMEM);
1148         lov_init_set(set);
1149
1150         set->set_oi = oinfo;
1151         set->set_exp = exp;
1152
1153         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1154                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1155                 struct lov_request *req;
1156                 obd_off rs, re;
1157
1158                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1159                                            oinfo->oi_policy.l_extent.start,
1160                                            oinfo->oi_policy.l_extent.end,
1161                                            &rs, &re))
1162                         continue;
1163
1164                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1165                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1166                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1167                         GOTO(out_set, rc = -EIO);
1168                 }
1169
1170                 OBD_ALLOC(req, sizeof(*req));
1171                 if (req == NULL)
1172                         GOTO(out_set, rc = -ENOMEM);
1173                 req->rq_stripe = i;
1174                 req->rq_idx = loi->loi_ost_idx;
1175
1176                 OBDO_ALLOC(req->rq_oi.oi_oa);
1177                 if (req->rq_oi.oi_oa == NULL) {
1178                         OBD_FREE(req, sizeof(*req));
1179                         GOTO(out_set, rc = -ENOMEM);
1180                 }
1181                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1182                        sizeof(*req->rq_oi.oi_oa));
1183                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1184                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1185                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1186
1187                 req->rq_oi.oi_oa->o_stripe_idx = i;
1188                 req->rq_oi.oi_cb_up = cb_update_punch;
1189
1190                 req->rq_oi.oi_policy.l_extent.start = rs;
1191                 req->rq_oi.oi_policy.l_extent.end = re;
1192                 req->rq_oi.oi_policy.l_extent.gid = -1;
1193
1194                 req->rq_oi.oi_capa = oinfo->oi_capa;
1195
1196                 lov_set_add_req(req, set);
1197         }
1198         if (!set->set_count)
1199                 GOTO(out_set, rc = -EIO);
1200         *reqset = set;
1201         RETURN(rc);
1202 out_set:
1203         lov_fini_punch_set(set);
1204         RETURN(rc);
1205 }
1206
1207 int lov_fini_sync_set(struct lov_request_set *set)
1208 {
1209         int rc = 0;
1210         ENTRY;
1211
1212         if (set == NULL)
1213                 RETURN(0);
1214         LASSERT(set->set_exp);
1215         if (cfs_atomic_read(&set->set_completes)) {
1216                 if (!cfs_atomic_read(&set->set_success))
1217                         rc = -EIO;
1218                 /* FIXME update qos data here */
1219         }
1220
1221         lov_put_reqset(set);
1222
1223         RETURN(rc);
1224 }
1225
1226 /* The callback for osc_sync that finilizes a request info when a
1227  * response is recieved. */
1228 static int cb_sync_update(void *cookie, int rc)
1229 {
1230         struct obd_info *oinfo = cookie;
1231         struct lov_request *lovreq;
1232
1233         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1234         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1235 }
1236
1237 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1238                       obd_off start, obd_off end,
1239                       struct lov_request_set **reqset)
1240 {
1241         struct lov_request_set *set;
1242         struct lov_obd *lov = &exp->exp_obd->u.lov;
1243         int rc = 0, i;
1244         ENTRY;
1245
1246         OBD_ALLOC_PTR(set);
1247         if (set == NULL)
1248                 RETURN(-ENOMEM);
1249         lov_init_set(set);
1250
1251         set->set_exp = exp;
1252         set->set_oi = oinfo;
1253
1254         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1255                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1256                 struct lov_request *req;
1257                 obd_off rs, re;
1258
1259                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1260                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1261                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1262                         continue;
1263                 }
1264
1265                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1266                                            &re))
1267                         continue;
1268
1269                 OBD_ALLOC_PTR(req);
1270                 if (req == NULL)
1271                         GOTO(out_set, rc = -ENOMEM);
1272                 req->rq_stripe = i;
1273                 req->rq_idx = loi->loi_ost_idx;
1274
1275                 OBDO_ALLOC(req->rq_oi.oi_oa);
1276                 if (req->rq_oi.oi_oa == NULL) {
1277                         OBD_FREE(req, sizeof(*req));
1278                         GOTO(out_set, rc = -ENOMEM);
1279                 }
1280                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1281                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1282                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1283                 req->rq_oi.oi_oa->o_stripe_idx = i;
1284
1285                 req->rq_oi.oi_policy.l_extent.start = rs;
1286                 req->rq_oi.oi_policy.l_extent.end = re;
1287                 req->rq_oi.oi_policy.l_extent.gid = -1;
1288                 req->rq_oi.oi_cb_up = cb_sync_update;
1289
1290                 lov_set_add_req(req, set);
1291         }
1292         if (!set->set_count)
1293                 GOTO(out_set, rc = -EIO);
1294         *reqset = set;
1295         RETURN(rc);
1296 out_set:
1297         lov_fini_sync_set(set);
1298         RETURN(rc);
1299 }
1300
1301 #define LOV_U64_MAX ((__u64)~0ULL)
1302 #define LOV_SUM_MAX(tot, add)                                           \
1303         do {                                                            \
1304                 if ((tot) + (add) < (tot))                              \
1305                         (tot) = LOV_U64_MAX;                            \
1306                 else                                                    \
1307                         (tot) += (add);                                 \
1308         } while(0)
1309
1310 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1311 {
1312         ENTRY;
1313
1314         if (success) {
1315                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1316                                                            LOV_MAGIC, 0);
1317                 if (osfs->os_files != LOV_U64_MAX)
1318                         lov_do_div64(osfs->os_files, expected_stripes);
1319                 if (osfs->os_ffree != LOV_U64_MAX)
1320                         lov_do_div64(osfs->os_ffree, expected_stripes);
1321
1322                 spin_lock(&obd->obd_osfs_lock);
1323                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1324                 obd->obd_osfs_age = cfs_time_current_64();
1325                 spin_unlock(&obd->obd_osfs_lock);
1326                 RETURN(0);
1327         }
1328
1329         RETURN(-EIO);
1330 }
1331
1332 int lov_fini_statfs_set(struct lov_request_set *set)
1333 {
1334         int rc = 0;
1335         ENTRY;
1336
1337         if (set == NULL)
1338                 RETURN(0);
1339
1340         if (cfs_atomic_read(&set->set_completes)) {
1341                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1342                                      cfs_atomic_read(&set->set_success));
1343         }
1344         lov_put_reqset(set);
1345         RETURN(rc);
1346 }
1347
1348 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1349                        int success)
1350 {
1351         int shift = 0, quit = 0;
1352         __u64 tmp;
1353
1354         if (success == 0) {
1355                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1356         } else {
1357                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1358                         /* assume all block sizes are always powers of 2 */
1359                         /* get the bits difference */
1360                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1361                         for (shift = 0; shift <= 64; ++shift) {
1362                                 if (tmp & 1) {
1363                                         if (quit)
1364                                                 break;
1365                                         else
1366                                                 quit = 1;
1367                                         shift = 0;
1368                                 }
1369                                 tmp >>= 1;
1370                         }
1371                 }
1372
1373                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1374                         osfs->os_bsize = lov_sfs->os_bsize;
1375
1376                         osfs->os_bfree  >>= shift;
1377                         osfs->os_bavail >>= shift;
1378                         osfs->os_blocks >>= shift;
1379                 } else if (shift != 0) {
1380                         lov_sfs->os_bfree  >>= shift;
1381                         lov_sfs->os_bavail >>= shift;
1382                         lov_sfs->os_blocks >>= shift;
1383                 }
1384 #ifdef MIN_DF
1385                 /* Sandia requested that df (and so, statfs) only
1386                    returned minimal available space on
1387                    a single OST, so people would be able to
1388                    write this much data guaranteed. */
1389                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1390                         /* Presumably if new bavail is smaller,
1391                            new bfree is bigger as well */
1392                         osfs->os_bfree = lov_sfs->os_bfree;
1393                         osfs->os_bavail = lov_sfs->os_bavail;
1394                 }
1395 #else
1396                 osfs->os_bfree += lov_sfs->os_bfree;
1397                 osfs->os_bavail += lov_sfs->os_bavail;
1398 #endif
1399                 osfs->os_blocks += lov_sfs->os_blocks;
1400                 /* XXX not sure about this one - depends on policy.
1401                  *   - could be minimum if we always stripe on all OBDs
1402                  *     (but that would be wrong for any other policy,
1403                  *     if one of the OBDs has no more objects left)
1404                  *   - could be sum if we stripe whole objects
1405                  *   - could be average, just to give a nice number
1406                  *
1407                  * To give a "reasonable" (if not wholly accurate)
1408                  * number, we divide the total number of free objects
1409                  * by expected stripe count (watch out for overflow).
1410                  */
1411                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1412                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1413         }
1414 }
1415
1416 /* The callback for osc_statfs_async that finilizes a request info when a
1417  * response is received. */
1418 static int cb_statfs_update(void *cookie, int rc)
1419 {
1420         struct obd_info *oinfo = cookie;
1421         struct lov_request *lovreq;
1422         struct lov_request_set *set;
1423         struct obd_statfs *osfs, *lov_sfs;
1424         struct lov_obd *lov;
1425         struct lov_tgt_desc *tgt;
1426         struct obd_device *lovobd, *tgtobd;
1427         int success;
1428         ENTRY;
1429
1430         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1431         set = lovreq->rq_rqset;
1432         lovobd = set->set_obd;
1433         lov = &lovobd->u.lov;
1434         osfs = set->set_oi->oi_osfs;
1435         lov_sfs = oinfo->oi_osfs;
1436         success = cfs_atomic_read(&set->set_success);
1437         /* XXX: the same is done in lov_update_common_set, however
1438            lovset->set_exp is not initialized. */
1439         lov_update_set(set, lovreq, rc);
1440         if (rc)
1441                 GOTO(out, rc);
1442
1443         obd_getref(lovobd);
1444         tgt = lov->lov_tgts[lovreq->rq_idx];
1445         if (!tgt || !tgt->ltd_active)
1446                 GOTO(out_update, rc);
1447
1448         tgtobd = class_exp2obd(tgt->ltd_exp);
1449         spin_lock(&tgtobd->obd_osfs_lock);
1450         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1451         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1452                 tgtobd->obd_osfs_age = cfs_time_current_64();
1453         spin_unlock(&tgtobd->obd_osfs_lock);
1454
1455 out_update:
1456         lov_update_statfs(osfs, lov_sfs, success);
1457         obd_putref(lovobd);
1458
1459 out:
1460         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1461             lov_set_finished(set, 0)) {
1462                 lov_statfs_interpret(NULL, set, set->set_count !=
1463                                      cfs_atomic_read(&set->set_success));
1464         }
1465
1466         RETURN(0);
1467 }
1468
1469 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1470                         struct lov_request_set **reqset)
1471 {
1472         struct lov_request_set *set;
1473         struct lov_obd *lov = &obd->u.lov;
1474         int rc = 0, i;
1475         ENTRY;
1476
1477         OBD_ALLOC(set, sizeof(*set));
1478         if (set == NULL)
1479                 RETURN(-ENOMEM);
1480         lov_init_set(set);
1481
1482         set->set_obd = obd;
1483         set->set_oi = oinfo;
1484
1485         /* We only get block data from the OBD */
1486         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1487                 struct lov_request *req;
1488
1489                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1490                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1491                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1492                         continue;
1493                 }
1494
1495                 /* skip targets that have been explicitely disabled by the
1496                  * administrator */
1497                 if (!lov->lov_tgts[i]->ltd_exp) {
1498                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1499                         continue;
1500                 }
1501
1502                 OBD_ALLOC(req, sizeof(*req));
1503                 if (req == NULL)
1504                         GOTO(out_set, rc = -ENOMEM);
1505
1506                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1507                 if (req->rq_oi.oi_osfs == NULL) {
1508                         OBD_FREE(req, sizeof(*req));
1509                         GOTO(out_set, rc = -ENOMEM);
1510                 }
1511
1512                 req->rq_idx = i;
1513                 req->rq_oi.oi_cb_up = cb_statfs_update;
1514                 req->rq_oi.oi_flags = oinfo->oi_flags;
1515
1516                 lov_set_add_req(req, set);
1517         }
1518         if (!set->set_count)
1519                 GOTO(out_set, rc = -EIO);
1520         *reqset = set;
1521         RETURN(rc);
1522 out_set:
1523         lov_fini_statfs_set(set);
1524         RETURN(rc);
1525 }