Whamcloud - gitweb
3b560f8ad7f4dbeaecb598dcd160b31e80691fce
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "osp_internal.h"
35
36 struct osp_async_update_args {
37         struct update_request   *oaua_update;
38         unsigned int             oaua_fc:1;
39 };
40
41 struct osp_async_update_item {
42         struct list_head                 oaui_list;
43         struct osp_object               *oaui_obj;
44         void                            *oaui_data;
45         osp_async_update_interpterer_t   oaui_interpterer;
46 };
47
48 static struct osp_async_update_item *
49 osp_async_update_item_init(struct osp_object *obj, void *data,
50                            osp_async_update_interpterer_t interpterer)
51 {
52         struct osp_async_update_item *oaui;
53
54         OBD_ALLOC_PTR(oaui);
55         if (oaui == NULL)
56                 return NULL;
57
58         lu_object_get(osp2lu_obj(obj));
59         INIT_LIST_HEAD(&oaui->oaui_list);
60         oaui->oaui_obj = obj;
61         oaui->oaui_data = data;
62         oaui->oaui_interpterer = interpterer;
63
64         return oaui;
65 }
66
67 static void osp_async_update_item_fini(const struct lu_env *env,
68                                        struct osp_async_update_item *oaui)
69 {
70         LASSERT(list_empty(&oaui->oaui_list));
71
72         lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
73         OBD_FREE_PTR(oaui);
74 }
75
76 static int osp_async_update_interpret(const struct lu_env *env,
77                                       struct ptlrpc_request *req,
78                                       void *arg, int rc)
79 {
80         struct update_reply             *reply  = NULL;
81         struct osp_async_update_args    *oaua   = arg;
82         struct update_request           *update = oaua->oaua_update;
83         struct osp_async_update_item    *oaui;
84         struct osp_async_update_item    *next;
85         struct osp_device               *osp    = dt2osp_dev(update->ur_dt);
86         int                              count  = 0;
87         int                              index  = 0;
88         int                              rc1    = 0;
89
90         if (oaua->oaua_fc)
91                 up(&osp->opd_async_fc_sem);
92
93         if (rc == 0 || req->rq_repmsg != NULL) {
94                 reply = req_capsule_server_sized_get(&req->rq_pill,
95                                                      &RMF_UPDATE_REPLY,
96                                                      UPDATE_BUFFER_SIZE);
97                 if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
98                         rc1 = -EPROTO;
99                 else
100                         count = reply->ur_count;
101         } else {
102                 rc1 = rc;
103         }
104
105         list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
106                 list_del_init(&oaui->oaui_list);
107                 if (index < count && reply->ur_lens[index] > 0) {
108                         char *ptr = update_get_buf_internal(reply, index, NULL);
109
110                         LASSERT(ptr != NULL);
111
112                         rc1 = le32_to_cpu(*(int *)ptr);
113                 } else {
114                         rc1 = rc;
115                         if (unlikely(rc1 == 0))
116                                 rc1 = -EINVAL;
117                 }
118
119                 oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
120                                        oaui->oaui_data, index, rc1);
121                 osp_async_update_item_fini(env, oaui);
122                 index++;
123         }
124
125         out_destroy_update_req(update);
126
127         return 0;
128 }
129
130 int osp_unplug_async_update(const struct lu_env *env,
131                             struct osp_device *osp,
132                             struct update_request *update)
133 {
134         struct osp_async_update_args    *args;
135         struct ptlrpc_request           *req = NULL;
136         int                              rc;
137
138         rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
139                                  update->ur_buf, UPDATE_BUFFER_SIZE, &req);
140         if (rc != 0) {
141                 struct osp_async_update_item *oaui;
142                 struct osp_async_update_item *next;
143
144                 list_for_each_entry_safe(oaui, next,
145                                          &update->ur_cb_items, oaui_list) {
146                         list_del_init(&oaui->oaui_list);
147                         oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
148                                                oaui->oaui_data, 0, rc);
149                         osp_async_update_item_fini(env, oaui);
150                 }
151                 out_destroy_update_req(update);
152         } else {
153                 LASSERT(list_empty(&update->ur_list));
154
155                 args = ptlrpc_req_async_args(req);
156                 args->oaua_update = update;
157                 req->rq_interpret_reply = osp_async_update_interpret;
158                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
159         }
160
161         return rc;
162 }
163
164 /* with osp::opd_async_requests_mutex held */
165 struct update_request *
166 osp_find_or_create_async_update_request(struct osp_device *osp)
167 {
168         struct update_request *update = osp->opd_async_requests;
169
170         if (update != NULL)
171                 return update;
172
173         update = out_create_update_req(&osp->opd_dt_dev);
174         if (!IS_ERR(update))
175                 osp->opd_async_requests = update;
176
177         return update;
178 }
179
180 /* with osp::opd_async_requests_mutex held */
181 int osp_insert_async_update(const struct lu_env *env,
182                             struct update_request *update, int op,
183                             struct osp_object *obj, int count,
184                             int *lens, const char **bufs, void *data,
185                             osp_async_update_interpterer_t interpterer)
186 {
187         struct osp_async_update_item *oaui;
188         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
189         int                           rc  = 0;
190         ENTRY;
191
192         oaui = osp_async_update_item_init(obj, data, interpterer);
193         if (oaui == NULL)
194                 RETURN(-ENOMEM);
195
196 again:
197         rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
198                                count, lens, bufs);
199         if (rc == -E2BIG) {
200                 osp->opd_async_requests = NULL;
201                 mutex_unlock(&osp->opd_async_requests_mutex);
202
203                 rc = osp_unplug_async_update(env, osp, update);
204                 mutex_lock(&osp->opd_async_requests_mutex);
205                 if (rc != 0)
206                         GOTO(out, rc);
207
208                 update = osp_find_or_create_async_update_request(osp);
209                 if (IS_ERR(update))
210                         GOTO(out, rc = PTR_ERR(update));
211
212                 goto again;
213         }
214
215         if (rc == 0)
216                 list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
217
218         GOTO(out, rc);
219
220 out:
221         if (rc != 0)
222                 osp_async_update_item_fini(env, oaui);
223
224         return rc;
225 }
226
227 /**
228  * If the transaction creation goes to OSP, it means the update
229  * in this transaction only includes remote UPDATE. It is only
230  * used by LFSCK right now.
231  **/
232 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
233 {
234         struct thandle *th = NULL;
235         struct thandle_update *tu = NULL;
236         int rc = 0;
237
238         OBD_ALLOC_PTR(th);
239         if (unlikely(th == NULL))
240                 GOTO(out, rc = -ENOMEM);
241
242         th->th_dev = d;
243         th->th_tags = LCT_TX_HANDLE;
244         atomic_set(&th->th_refc, 1);
245         th->th_alloc_size = sizeof(*th);
246
247         OBD_ALLOC_PTR(tu);
248         if (tu == NULL)
249                 GOTO(out, rc = -ENOMEM);
250
251         INIT_LIST_HEAD(&tu->tu_remote_update_list);
252         tu->tu_only_remote_trans = 1;
253         th->th_update = tu;
254
255 out:
256         if (rc != 0) {
257                 if (tu != NULL)
258                         OBD_FREE_PTR(tu);
259                 if (th != NULL)
260                         OBD_FREE_PTR(th);
261                 th = ERR_PTR(rc);
262         }
263
264         return th;
265 }
266
267 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
268                              struct update_request *update, struct thandle *th,
269                              bool fc)
270 {
271         struct thandle_update   *tu = th->th_update;
272         int                     rc = 0;
273
274         LASSERT(tu != NULL);
275
276         /* If the transaction only includes remote update, it should
277          * still be asynchronous */
278         if (is_only_remote_trans(th)) {
279                 struct osp_async_update_args    *args;
280                 struct ptlrpc_request           *req;
281
282                 list_del_init(&update->ur_list);
283                 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
284                                          update->ur_buf,
285                                          UPDATE_BUFFER_SIZE, &req);
286                 if (rc == 0) {
287                         args = ptlrpc_req_async_args(req);
288                         args->oaua_update = update;
289                         args->oaua_fc = !!fc;
290                         req->rq_interpret_reply =
291                                 osp_async_update_interpret;
292                         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
293                 } else {
294                         out_destroy_update_req(update);
295                 }
296         } else {
297                 /* Before we support async update, the cross MDT transaction
298                  * has to been synchronized */
299                 th->th_sync = 1;
300                 rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
301                                      update, NULL);
302         }
303
304         return rc;
305 }
306
307 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
308                     struct thandle *th)
309 {
310         struct thandle_update *tu = th->th_update;
311         struct update_request *update;
312         int rc = 0;
313
314         if (tu == NULL)
315                 return rc;
316
317         /* Check whether there are updates related with this OSP */
318         update = out_find_update(tu, dt);
319         if (update == NULL)
320                 return rc;
321
322         /* Note: some updates needs to send before local transaction,
323          * some needs to send after local transaction.
324          *
325          * If the transaction only includes remote updates, it will
326          * send updates to remote MDT in osp_trans_stop.
327          *
328          * If it is remote create, it will send the remote req after
329          * local transaction. i.e. create the object locally first,
330          * then insert the name entry.
331          *
332          * If it is remote unlink, it will send the remote req before
333          * the local transaction, i.e. delete the name entry remote
334          * first, then destroy the local object. */
335         if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
336                 rc = osp_trans_trigger(env, dt2osp_dev(dt), update, th, false);
337
338         return rc;
339 }
340
341 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
342                    struct thandle *th)
343 {
344         struct thandle_update   *tu = th->th_update;
345         struct update_request   *update;
346         int rc = 0;
347
348         LASSERT(tu != NULL);
349         /* Check whether there are updates related with this OSP */
350         update = out_find_update(tu, dt);
351         if (update == NULL) {
352                 if (!is_only_remote_trans(th))
353                         return rc;
354                 goto put;
355         }
356
357         if (update->ur_buf->ub_count == 0) {
358                 out_destroy_update_req(update);
359                 goto put;
360         }
361
362         if (is_only_remote_trans(th)) {
363                 if (th->th_result == 0) {
364                         struct osp_device *osp = dt2osp_dev(th->th_dev);
365
366                         do {
367                                 if (!osp->opd_imp_active ||
368                                     osp->opd_got_disconnected) {
369                                         out_destroy_update_req(update);
370                                         GOTO(put, rc = -ENOTCONN);
371                                 }
372
373                                 /* Get the semaphore to guarantee it has
374                                  * free slot, which will be released via
375                                  * osp_async_update_interpret(). */
376                                 rc = down_timeout(&osp->opd_async_fc_sem, HZ);
377                         } while (rc != 0);
378
379                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
380                                                update, th, true);
381                         if (rc != 0)
382                                 up(&osp->opd_async_fc_sem);
383                 } else {
384                         rc = th->th_result;
385                         out_destroy_update_req(update);
386                 }
387         } else {
388                 if (tu->tu_sent_after_local_trans)
389                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
390                                                update, th, false);
391                 rc = update->ur_rc;
392                 out_destroy_update_req(update);
393         }
394
395 put:
396         thandle_put(th);
397         return rc;
398 }