本文将从多个方面详细阐述CTP程序化交易入门系列,包括行情获取、交易指令下达等。
一、行情获取
在进行程序化交易前,需要获取实时的行情信息。CTP提供了多种获取行情的渠道,包括:
1、使用CTP API获取行情:通过CTP API获取行情数据,具有实时性和数据完整性。
// 示例代码: #include #include "ThostFtdcMdApi.h" class CTraderApi : public CThostFtdcMdSpi { public: //...... virtual void OnRtnDepthMarketData(CThostFtdcDepthMarketDataField *pDepthMarketData); }; void CTraderApi::OnRtnDepthMarketData(CThostFtdcDepthMarketDataField *pDepthMarketData) { // 获取行情数据 } int main() { CThostFtdcMdApi* pMdUserApi = CThostFtdcMdApi::CreateFtdcMdApi(); CTraderApi* pTraderSpi = new CTraderApi(); pMdUserApi->RegisterSpi(pTraderSpi); pMdUserApi->Init(); return 0; }
2、使用K线等历史数据:通过请求历史数据获取行情,缺点是数据不够实时。
// 示例代码: from ctpapi import ApiStruct import ctpapi class MyMdApi(ctpapi.CThostFtdcMdApi): def __init__(self, instruments): ctpapi.CThostFtdcMdApi.__init__(self) self.instruments = instruments def onRtnDepthMarketData(self, pDepthMarketData): # 获取行情数据 if __name__ == '__main__': instruments = ['rb2105', 'i2105'] user = 'xxx' password = 'xxx' broker_id = 'xxx' address = 'xxx' api = MyMdApi(instruments) api.Create("") api.RegisterFront(address) api.Init() loginReq = ApiStruct.ReqUserLogin(BrokerID=broker_id, UserID=user, Password=password) api.ReqUserLogin(loginReq, 1)
二、交易指令下达
获取行情后,需要进行交易指令下达。交易指令下达有多种方式,包括:
1、使用CTP API下单:通过CTP API下单,具有实时性和交互性。
// 示例代码: #include #include "ThostFtdcTraderApi.h" class CTraderApi : public CThostFtdcTraderSpi { public: //...... virtual void OnRtnOrder(CThostFtdcOrderField *pOrder); }; void CTraderApi::OnRtnOrder(CThostFtdcOrderField *pOrder) { // 获取下单结果 } int main() { CThostFtdcTraderApi* pUserApi = CThostFtdcTraderApi::CreateFtdcTraderApi(); CTraderApi* pTraderSpi = new CTraderApi(); pUserApi->RegisterSpi(pTraderSpi); pUserApi->SubscribePublicTopic(THOST_TERT_QUICK); pUserApi->SubscribePrivateTopic(THOST_TERT_QUICK); pUserApi->RegisterFront("tcp://xxx"); pUserApi->Init(); CThostFtdcInputOrderField order = {0}; // 下单代码 pUserApi->ReqOrderInsert(ℴ, nRequestID++); return 0; }
2、通过HTTP请求下单接口下单:使用HTTP请求下单接口发送下单请求,不需要本地安装CTP API,但速度和稳定性相对较低。
// 示例代码: import requests import json def http_order(instrument, price, volume, direction): url = 'http://xxx/api/order' headers = {'content-type': 'application/json'} data = {'instrument': instrument, 'price': price, 'volume': volume, 'direction': direction} r = requests.post(url, data=json.dumps(data), headers=headers) result = json.loads(r.content)["result"] if result: # 下单成功 else: # 下单失败
三、风险管理
在进行程序化交易时,必须考虑风险控制。对于期货交易,风险控制的主要手段包括:
1、资金管理:通过实时计算持仓市值、权益等信息,判断是否需要进行追加保证金或平仓操作。
// 示例代码: #include #include "ThostFtdcTraderApi.h" class CTraderApi : public CThostFtdcTraderSpi { public: //...... virtual void OnRtnTradingNotice(CThostFtdcTradingNoticeInfoField *pTradingNoticeInfo); }; void CTraderApi::OnRtnTradingNotice(CThostFtdcTradingNoticeInfoField *pTradingNoticeInfo) { // 获取到交易通知,判断是否需要进行追加保证金或平仓操作 } int main() { CThostFtdcTraderApi* pUserApi = CThostFtdcTraderApi::CreateFtdcTraderApi(); CTraderApi* pTraderSpi = new CTraderApi(); pUserApi->RegisterSpi(pTraderSpi); pUserApi->SubscribePublicTopic(THOST_TERT_QUICK); pUserApi->SubscribePrivateTopic(THOST_TERT_QUICK); pUserApi->RegisterFront("tcp://xxx"); pUserApi->Init(); return 0; }
2、止损设置:通过设置止损参数,对持仓进行风险控制。
// 示例代码: from ctpapi import ApiStruct import ctpapi class MyMdApi(ctpapi.CThostFtdcMdApi): def __init__(self, instruments, api): ctpapi.CThostFtdcMdApi.__init__(self) self.instruments = instruments self.api = api def onRtnDepthMarketData(self, pDepthMarketData): # 设置止损参数 if (pDepthMarketData.LastPrice >= 3500): order = ApiStruct.InputOrder( InstrumentID='cu2105', LimitPrice=3300, VolumeTotalOriginal=1, OrderPriceType=ApiStruct.OPT_LIMIT_PRICE, Direction=ApiStruct.DIRECTION_SELL, CombOffsetFlag=ApiStruct.OFFSET_OPEN ) self.api.ReqOrderInsert(order, api.nRequestID) else: # ... def set_api(self, api): self.api = api class MyTraderApi(ctpapi.CThostFtdcTraderSpi): def __init__(self, user_id, password, broker_id, md_api): ctpapi.CThostFtdcTraderSpi.__init__(self) self.__req_id = 0 self.__user_id = user_id self.__password = password self.__broker_id = broker_id self.__md_api = md_api def onFrontConnected(self): login_req = ApiStruct.ReqUserLogin( BrokerID=self.__broker_id, UserID=self.__user_id, Password=self.__password) self.__md_api.ReqUserLogin(login_req, self.__req_id) def onRtnOrder(self, order): # 处理订单回报 def onRtnTrade(self, trade): # 处理成交回报 def onRtnInstrumentStatus(self, instrument_status): # 处理合约状态 if __name__ == '__main__': instruments = ['rb2105', 'i2105'] user = 'xxx' password = 'xxx' broker_id = 'xxx' address = 'xxx' api = ctpapi.CThostFtdcTraderApi_CreateFtdcTraderApi() api.RegisterFront(address) api.SubscribePrivateTopic(ctpapi.TERT_QUICK) api.SubscribePublicTopic(ctpapi.TERT_QUICK) md_api = MyMdApi(instruments, api) md_api.Create("") md_api.set_api(api) md_api.RegisterFront(address) md_api.Init() trader = MyTraderApi(user, password, broker_id, md_api) api.RegisterSpi(trader) api.Init() api.Join()
四、策略开发
在实际应用中,程序化交易主要需要用到策略开发,即开发交易策略并实现自动下单。主要步骤如下:
1、根据市场情况和投资组合确定交易策略。
2、编写交易规则:定义入场/出场、止损/止盈条件、持仓跟踪等参数。
// 示例代码: class MyMdApi(ctpapi.CThostFtdcMdApi): def __init__(self, instruments, api): ctpapi.CThostFtdcMdApi.__init__(self) self.instruments = instruments self.api = api def onRtnDepthMarketData(self, pDepthMarketData): # 确定交易策略 if pDepthMarketData.InstrumentID == 'rb2105': if (pDepthMarketData.LastPrice >= 3500): # 行情符合交易策略 order = ApiStruct.InputOrder( InstrumentID='cu2105', LimitPrice=3300, VolumeTotalOriginal=1, OrderPriceType=ApiStruct.OPT_LIMIT_PRICE, Direction=ApiStruct.DIRECTION_SELL, CombOffsetFlag=ApiStruct.OFFSET_OPEN ) self.api.ReqOrderInsert(order, api.nRequestID) else: # ... def set_api(self, api): self.api = api class MyTraderApi(ctpapi.CThostFtdcTraderSpi): def __init__(self, user_id, password, broker_id, md_api): ctpapi.CThostFtdcTraderSpi.__init__(self) self.__req_id = 0 self.__user_id = user_id self.__password = password self.__broker_id = broker_id self.__md_api = md_api def onFrontConnected(self): login_req = ApiStruct.ReqUserLogin( BrokerID=self.__broker_id, UserID=self.__user_id, Password=self.__password) self.__md_api.ReqUserLogin(login_req, self.__req_id) def onRtnOrder(self, order): # 处理订单回报 def onRtnTrade(self, trade): # 处理成交回报 def onRtnInstrumentStatus(self, instrument_status): # 处理合约状态 if __name__ == '__main__': instruments = ['rb2105', 'i2105'] user = 'xxx' password = 'xxx' broker_id = 'xxx' address = 'xxx' api = ctpapi.CThostFtdcTraderApi_CreateFtdcTraderApi() api.RegisterFront(address) api.SubscribePrivateTopic(ctpapi.TERT_QUICK) api.SubscribePublicTopic(ctpapi.TERT_QUICK) md_api = MyMdApi(instruments, api) md_api.Create("") md_api.set_api(api) md_api.RegisterFront(address) md_api.Init() trader = MyTraderApi(user, password, broker_id, md_api) api.RegisterSpi(trader) api.Init() api.Join()
3、进行回测和优化:使用历史数据和交易规则进行模拟回测,评估策略的效果并进行优化和调整。
// 示例代码: def backtest(testdata: pd.DataFrame, strategy: object) -> pd.DataFrame: result = pd.DataFrame(columns=['datetime', 'price', 'action']) for i in range(0, len(testdata)): action = strategy.run(testdata.iloc[i]) if action: result = result.append( {'datetime': testdata.iloc[i]['datetime'], 'price': testdata.iloc[i]['price'], 'action': action}, ignore_index=True) return result class MyStrategy(): def __init__(self): pass def run(self, data): # 执行交易规则 if data['price'] >= 3500: return 'sell' return False if __name__ == '__main__': testdata = pd.read_csv('testdata.csv') strategy = MyStrategy() backtest_result = backtest(testdata, strategy) # 输出回测结果
原创文章,作者:RAVHS,如若转载,请注明出处:https://www.506064.com/n/374675.html