Whamcloud - gitweb
3fe80804ba3df63469dac063874a823dc13fd0e3
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <lustre_update.h>
36 #include <obd.h>
37
38 struct update_request *out_find_update(struct thandle *th,
39                                        struct dt_device *dt_dev)
40 {
41         struct update_request   *update;
42
43         list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
44                 if (update->ur_dt == dt_dev)
45                         return update;
46         }
47
48         return NULL;
49 }
50 EXPORT_SYMBOL(out_find_update);
51
52 void out_destroy_update_req(struct update_request *update)
53 {
54         if (update == NULL)
55                 return;
56
57         LASSERT(list_empty(&update->ur_cb_items));
58
59         list_del(&update->ur_list);
60         if (update->ur_buf != NULL)
61                 OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
62
63         OBD_FREE_PTR(update);
64 }
65 EXPORT_SYMBOL(out_destroy_update_req);
66
67 struct update_request *out_create_update_req(struct dt_device *dt)
68 {
69         struct update_request *update;
70
71         OBD_ALLOC_PTR(update);
72         if (update == NULL)
73                 return ERR_PTR(-ENOMEM);
74
75         OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
76         if (update->ur_buf == NULL) {
77                 OBD_FREE_PTR(update);
78
79                 return ERR_PTR(-ENOMEM);
80         }
81
82         INIT_LIST_HEAD(&update->ur_list);
83         update->ur_dt = dt;
84         update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
85         update->ur_buf->ub_count = 0;
86         INIT_LIST_HEAD(&update->ur_cb_items);
87
88         return update;
89 }
90 EXPORT_SYMBOL(out_create_update_req);
91
92 /**
93  * Find one loc in th_dev/dev_obj_update for the update,
94  * Because only one thread can access this thandle, no need
95  * lock now.
96  */
97 struct update_request *out_find_create_update_loc(struct thandle *th,
98                                                   struct dt_object *dt)
99 {
100         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
101         struct update_request   *update;
102         ENTRY;
103
104         update = out_find_update(th, dt_dev);
105         if (update != NULL)
106                 RETURN(update);
107
108         update = out_create_update_req(dt_dev);
109         if (IS_ERR(update))
110                 RETURN(update);
111
112         list_add_tail(&update->ur_list, &th->th_remote_update_list);
113
114         RETURN(update);
115 }
116 EXPORT_SYMBOL(out_find_create_update_loc);
117
118 int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
119                         const struct update_buf *ubuf, int ubuf_len,
120                         struct ptlrpc_request **reqp)
121 {
122         struct ptlrpc_request  *req;
123         struct update_buf      *tmp;
124         int                     rc;
125         ENTRY;
126
127         req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
128         if (req == NULL)
129                 RETURN(-ENOMEM);
130
131         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
132                              UPDATE_BUFFER_SIZE);
133
134         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
135         if (rc != 0) {
136                 ptlrpc_req_finished(req);
137                 RETURN(rc);
138         }
139
140         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
141                              UPDATE_BUFFER_SIZE);
142
143         tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
144         memcpy(tmp, ubuf, ubuf_len);
145         ptlrpc_request_set_replen(req);
146         req->rq_request_portal = OUT_PORTAL;
147         req->rq_reply_portal = OSC_REPLY_PORTAL;
148         *reqp = req;
149
150         RETURN(rc);
151 }
152 EXPORT_SYMBOL(out_prep_update_req);
153
154 int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
155                     struct update_request *update,
156                     struct ptlrpc_request **reqp)
157 {
158         struct ptlrpc_request   *req = NULL;
159         int                      rc;
160         ENTRY;
161
162         rc = out_prep_update_req(env, imp, update->ur_buf,
163                                  UPDATE_BUFFER_SIZE, &req);
164         if (rc != 0)
165                 RETURN(rc);
166
167         /* Note: some dt index api might return non-zero result here, like
168          * osd_index_ea_lookup, so we should only check rc < 0 here */
169         rc = ptlrpc_queue_wait(req);
170         if (rc < 0) {
171                 ptlrpc_req_finished(req);
172                 update->ur_rc = rc;
173                 RETURN(rc);
174         }
175
176         if (reqp != NULL) {
177                 *reqp = req;
178         } else {
179                 update->ur_rc = rc;
180                 ptlrpc_req_finished(req);
181         }
182
183         RETURN(rc);
184 }
185 EXPORT_SYMBOL(out_remote_sync);
186
187 int out_insert_update(const struct lu_env *env, struct update_request *update,
188                       int op, const struct lu_fid *fid, int count,
189                       int *lens, const char **bufs)
190 {
191         struct update_buf    *ubuf = update->ur_buf;
192         struct update        *obj_update;
193         char                 *ptr;
194         int                   i;
195         int                   update_length;
196         ENTRY;
197
198         obj_update = (struct update *)((char *)ubuf +
199                       cfs_size_round(update_buf_size(ubuf)));
200
201         /* Check update size to make sure it can fit into the buffer */
202         update_length = cfs_size_round(offsetof(struct update,
203                                        u_bufs[0]));
204         for (i = 0; i < count; i++)
205                 update_length += cfs_size_round(lens[i]);
206
207         if (cfs_size_round(update_buf_size(ubuf)) + update_length >
208             UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS)
209                 RETURN(-E2BIG);
210
211         if (count > UPDATE_BUF_COUNT)
212                 RETURN(-E2BIG);
213
214         /* fill the update into the update buffer */
215         fid_cpu_to_le(&obj_update->u_fid, fid);
216         obj_update->u_type = cpu_to_le32(op);
217         obj_update->u_batchid = update->ur_batchid;
218         for (i = 0; i < count; i++)
219                 obj_update->u_lens[i] = cpu_to_le32(lens[i]);
220
221         ptr = (char *)obj_update +
222                         cfs_size_round(offsetof(struct update, u_bufs[0]));
223         for (i = 0; i < count; i++)
224                 LOGL(bufs[i], lens[i], ptr);
225
226         ubuf->ub_count++;
227
228         CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
229                update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
230                ubuf->ub_count, op, count, update_buf_size(ubuf));
231
232         RETURN(0);
233 }
234 EXPORT_SYMBOL(out_insert_update);