本文目錄一覽:
- 1、python 多線程logger問題
- 2、python 運維常用腳本
- 3、單細胞轉錄組雙細胞判別軟體scDblFinder
- 4、PYTHON!!!!!!!!!!!!!!!!!!!!!!!
- 5、關於python的幾個小編程 急!
python 多線程logger問題
因為logging是threadsafe的,但不是process-safe(應該沒有這個詞兒,只是為了便於理解)的。這段代碼就是多個進程共同操作一個日誌文件。這種情況下,logging的行為就很難說了。
我測試了一下,日誌中大概幾百行。而且,可以看到一些順序錯亂現象:
Fri, 08 Aug 2014 01:19:38 logging_in_multithread.py[line:40] theadWorking ERROR 2
FFri, 08 Aug 2014 01:19:36 logging_in_multithread.py[line:40] theadWorking ERROR 11(注意這裡的FFri)
把代碼這樣改:
for num in range(processNum):
p = Process(target=processWorking, args=(‘2’,))
processs.append(p)
p.start()
p.join()
還有其他方法,比如:為logging實現一個FileHandler,以使logging在multiple process的環境下也能正常工作。這是我從網上了解到的做法,自己還沒實踐過。
Python Manual中logging Cookbook中有這麼一段話:
Logging to a single file from multiple processes
Although logging is thread-safe, and logging to a single file from multiple threads in a single process is supported, logging to a single file from multiple processes is not supported, because there is no standard way to serialize access to a single file across multiple processes in Python. If you need to log to a single file from multiple processes, one way of doing this is to have all the processes log to a SocketHandler, and have a separate process which implements a socket server which reads from the socket and logs to file. (If you prefer, you can dedicate one thread in one of the existing processes to perform this function.)
這段話中也提出了另外一種解決方案。
python 運維常用腳本
Python 批量遍歷目錄文件,並修改訪問時間
import os
path = “D:/UASM64/include/”
dirs = os.listdir(path)
temp=[];
for file in dirs:
temp.append(os.path.join(path, file))
for x in temp:
os.utime(x, (1577808000, 1577808000))
Python 實現的自動化伺服器管理
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
def ssh_put(user,passwd,source,target):
while True:
try:
shell=str(input(“[Shell] # “))
if (shell == “”):
continue
elif (shell == “exit”):
exit()
elif (shell == “put”):
ssh_put(“root”,”123123″,”./a.py”,”/root/a.py”)
elif (shell ==”cron”):
temp=input(“輸入一個計劃任務: “)
temp1=”(crontab -l; echo “+ temp + “) |crontab”
ssh_cmd(“root”,”123123″,”22″,”./user_ip.conf”,temp1)
elif (shell == “uncron”):
temp=input(“輸入要刪除的計劃任務: “)
temp1=”crontab -l | grep -v ” “+ temp + “|crontab”
ssh_cmd(“root”,”123123″,”22″,”./user_ip.conf”,temp1)
else:
ssh_cmd(“lyshark”,”123123″,”22″,”./user_ip.conf”,shell)
遍歷目錄和文件
import os
def list_all_files(rootdir):
import os
_files = []
list = os.listdir(rootdir) #列出文件夾下所有的目錄與文件
for i in range(0,len(list)):
path = os.path.join(rootdir,list[i])
if os.path.isdir(path):
_files.extend(list_all_files(path))
if os.path.isfile(path):
_files.append(path)
return _files
a=list_all_files(“C:/Users/LyShark/Desktop/a”)
print(a)
python檢測指定埠狀態
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)
for ip in range(0,254):
try:
sk.connect((“192.168.1.”+str(ip),443))
print(“192.168.1.%d server open \n”%ip)
except Exception:
print(“192.168.1.%d server not open”%ip)
sk.close()
python實現批量執行CMD命令
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print(“——————————\n”)
print(“使用說明,在當前目錄創建ip.txt寫入ip地址”)
print(“——————————\n”)
user=input(“輸入用戶名:”)
passwd=input(“輸入密碼:”)
port=input(“輸入埠:”)
cmd=input(“輸入執行的命令:”)
file = open(“./ip.txt”, “r”)
line = file.readlines()
for i in range(len(line)):
print(“對IP: %s 執行”%line[i].strip(‘\n’))
python3-實現釘釘報警
import requests
import sys
import json
dingding_url = ‘ ‘
data = {“msgtype”: “markdown”,”markdown”: {“title”: “監控”,”text”: “apche異常”}}
headers = {‘Content-Type’:’application/json;charset=UTF-8′}
send_data = json.dumps(data).encode(‘utf-8’)
requests.post(url=dingding_url,data=send_data,headers=headers)
import psutil
import requests
import time
import os
import json
monitor_name = set([‘httpd’,’cobblerd’]) # 用戶指定監控的服務進程名稱
proc_dict = {}
proc_name = set() # 系統檢測的進程名稱
monitor_map = {
‘httpd’: ‘systemctl restart httpd’,
‘cobblerd’: ‘systemctl restart cobblerd’ # 系統在進程down掉後,自動重啟
}
dingding_url = ‘ ‘
while True:
for proc in psutil.process_iter(attrs=[‘pid’,’name’]):
proc_dict[proc.info[‘pid’]] = proc.info[‘name’]
proc_name.add(proc.info[‘name’])
判斷指定埠是否開放
import socket
port_number = [135,443,80]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((‘127.0.0.1’, index))
if result == 0:
print(“Port %d is open” % index)
else:
print(“Port %d is not open” % index)
sock.close()
判斷指定埠並且實現釘釘輪詢報警
import requests
import sys
import json
import socket
import time
def dingding(title,text):
dingding_url = ‘ ‘
data = {“msgtype”: “markdown”,”markdown”: {“title”: title,”text”: text}}
headers = {‘Content-Type’:’application/json;charset=UTF-8′}
send_data = json.dumps(data).encode(‘utf-8’)
requests.post(url=dingding_url,data=send_data,headers=headers)
def net_scan():
port_number = [80,135,443]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((‘127.0.0.1’, index))
if result == 0:
print(“Port %d is open” % index)
else:
return index
sock.close()
while True:
dingding(“Warning”,net_scan())
time.sleep(60)
python-實現SSH批量CMD執行命令
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
file = open(userfile, “r”)
line = file.readlines()
for i in range(len(line)):
print(“對IP: %s 執行”%line[i].strip(‘\n’))
ssh.connect(hostname=line[i].strip(‘\n’),port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
ssh_cmd(“lyshark”,”123″,”22″,”./ip.txt”,”free -h |grep ‘Mem:’ |awk ‘{print $3}'”)
用python寫一個列舉當前目錄以及所有子目錄下的文件,並列印出絕對路徑
import sys
import os
for root,dirs,files in os.walk(“C://”):
for name in files:
print(os.path.join(root,name))
os.walk()
按照這樣的日期格式(xxxx-xx-xx)每日生成一個文件,例如今天生成的文件為2013-09-23.log, 並且把磁碟的使用情況寫到到這個文件中。
import os
import sys
import time
new_time = time.strftime(“%Y-%m-%d”)
disk_status = os.popen(“df -h”).readlines()
str1 = ”.join(disk_status)
f = open(new_time+’.log’,’w’)
f.write(“%s”%str1)
f.flush()
f.close()
統計出每個IP的訪問量有多少?(從日誌文件中查找)
import sys
list = []
f = open(“/var/log/httpd/access_log”,”r”)
str1 = f.readlines()
f.close()
for i in str1:
ip=i.split()[0]
list.append(ip)
list_num=set(list)
for j in list_num:
num=list.count(j)
print(“%s —– %s” %(num,j))
寫個程序,接受用戶輸入數字,並進行校驗,非數字給出錯誤提示,然後重新等待用戶輸入。
import tab
import sys
while True:
try:
num=int(input(“輸入數字:”).strip())
for x in range(2,num+1):
for y in range(2,x):
if x % y == 0:
break
else:
print(x)
except ValueError:
print(“您輸入的不是數字”)
except KeyboardInterrupt:
sys.exit(“\n”)
ps 可以查看進程的內存佔用大小,寫一個腳本計算一下所有進程所佔用內存大小的和。
import sys
import os
list=[]
sum=0
str1=os.popen(“ps aux”,”r”).readlines()
for i in str1:
str2=i.split()
new_rss=str2[5]
list.append(new_rss)
for i in list[1:-1]:
num=int(i)
sum=sum+num
print(“%s — %s”%(list[0],sum))
關於Python 命令行參數argv
import sys
if len(sys.argv) 2:
print (“沒有輸入任何參數”)
sys.exit()
if sys.argv[1].startswith(“-“):
option = sys.argv[1][1:]
利用random生成6位數字加字母隨機驗證碼
import sys
import random
rand=[]
for x in range(6):
y=random.randrange(0,5)
if y == 2 or y == 4:
num=random.randrange(0,9)
rand.append(str(num))
else:
temp=random.randrange(65,91)
c=chr(temp)
rand.append(c)
result=””.join(rand)
print(result)
自動化-使用pexpect非交互登陸系統
import pexpect
import sys
ssh = pexpect.spawn(‘ssh lyshark@59.110.167.239’)
fout = file(‘sshlog.txt’, ‘w’)
ssh.logfile = fout
ssh.expect(“lyshark@59.110.167.239’s password:”)
ssh.sendline(“密碼”)
ssh.expect(‘#’)
ssh.sendline(‘ls /home’)
ssh.expect(‘#’)
Python-取系統時間
import sys
import time
time_str = time.strftime(“日期:%Y-%m-%d”,time.localtime())
print(time_str)
time_str= time.strftime(“時間:%H:%M”,time.localtime())
print(time_str)
psutil-獲取內存使用情況
import sys
import os
import psutil
memory_convent = 1024 * 1024
mem =psutil.virtual_memory()
print(“內存容量為:”+str(mem.total/(memory_convent))+”MB\n”)
print(“已使用內存:”+str(mem.used/(memory_convent))+”MB\n”)
print(“可用內存:”+str(mem.total/(memory_convent)-mem.used/(1024*1024))+”MB\n”)
print(“buffer容量:”+str(mem.buffers/( memory_convent ))+”MB\n”)
print(“cache容量:”+str(mem.cached/(memory_convent))+”MB\n”)
Python-通過SNMP協議監控CPU
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import os
def getAllitems(host, oid):
sn1 = os.popen(‘snmpwalk -v 2c -c public ‘ + host + ‘ ‘ + oid + ‘|grep Raw|grep Cpu|grep -v Kernel’).read().split(‘\n’)[:-1]
return sn1
def getDate(host):
items = getAllitems(host, ‘.1.3.6.1.4.1.2021.11’)
if name == ‘ main ‘:
Python-通過SNMP協議監控系統負載
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import os
import sys
def getAllitems(host, oid):
sn1 = os.popen(‘snmpwalk -v 2c -c public ‘ + host + ‘ ‘ + oid).read().split(‘\n’)
return sn1
def getload(host,loid):
load_oids = ‘1.3.6.1.4.1.2021.10.1.3.’ + str(loid)
return getAllitems(host,load_oids)[0].split(‘:’)[3]
if name == ‘ main ‘:
Python-通過SNMP協議監控內存
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import os
def getAllitems(host, oid):
def getSwapTotal(host):
def getSwapUsed(host):
def getMemTotal(host):
def getMemUsed(host):
if name == ‘ main ‘:
Python-通過SNMP協議監控磁碟
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import re
import os
def getAllitems(host,oid):
def getDate(source,newitem):
def getRealDate(item1,item2,listname):
def caculateDiskUsedRate(host):
if name == ‘ main ‘:
Python-通過SNMP協議監控網卡流量
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import re
import os
def getAllitems(host,oid):
sn1 = os.popen(‘snmpwalk -v 2c -c public ‘ + host + ‘ ‘ + oid).read().split(‘\n’)[:-1]
return sn1
def getDevices(host):
device_mib = getAllitems(host,’RFC1213-MIB::ifDescr’)
device_list = []
def getDate(host,oid):
date_mib = getAllitems(host,oid)[1:]
date = []
if name == ‘ main ‘:
Python-實現多級菜單
import os
import sys
ps=”[None]-“
ip=[“192.168.1.1″,”192.168.1.2″,”192.168.1.3”]
flage=1
while True:
ps=”[None]-“
temp=input(ps)
if (temp==”test”):
print(“test page !!!!”)
elif(temp==”user”):
while (flage == 1):
ps=”[User]-“
temp1=input(ps)
if(temp1 ==”exit”):
flage=0
break
elif(temp1==”show”):
for i in range(len(ip)):
print(i)
Python實現一個沒用的東西
import sys
ps=”[root@localhost]# “
ip=[“192.168.1.1″,”192.168.1.2″,”192.168.1.3”]
while True:
temp=input(ps)
temp1=temp.split()
檢查各個進程讀寫的磁碟IO
import sys
import os
import time
import signal
import re
class DiskIO:
def init (self, pname=None, pid=None, reads=0, writes=0):
self.pname = pname
self.pid = pid
self.reads = 0
self.writes = 0
def main():
argc = len(sys.argv)
if argc != 1:
print (“usage: please run this script like [./lyshark.py]”)
sys.exit(0)
if os.getuid() != 0:
print (“Error: This script must be run as root”)
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system(‘echo 1 /proc/sys/vm/block_dump’)
print (“TASK PID READ WRITE”)
while True:
os.system(‘dmesg -c /tmp/diskio.log’)
l = []
f = open(‘/tmp/diskio.log’, ‘r’)
line = f.readline()
while line:
m = re.match(
‘^(\S+)(\d+)(\d+): (READ|WRITE) block (\d+) on (\S+)’, line)
if m != None:
if not l:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
continue
found = False
for item in l:
if item.pid == m.group(2):
found = True
if m.group(3) == “READ”:
item.reads = item.reads + 1
elif m.group(3) == “WRITE”:
item.writes = item.writes + 1
if not found:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
time.sleep(1)
for item in l:
print (“%-10s %10s %10d %10d” %
(item.pname, item.pid, item.reads, item.writes))
def signal_handler(signal, frame):
os.system(‘echo 0 /proc/sys/vm/block_dump’)
sys.exit(0)
if name ==” main “:
main()
利用Pexpect實現自動非交互登陸linux
import pexpect
import sys
ssh = pexpect.spawn(‘ssh root@59.110.167.239’)
fout = file(‘sshlog.log’, ‘w’)
ssh.logfile = fout
ssh.expect(“root@59.110.167.239’s password:”)
ssh.sendline(“密碼”)
ssh.expect(‘#’)
ssh.sendline(‘ls /home’)
ssh.expect(‘#’)
利用psutil模塊獲取系統的各種統計信息
import sys
import psutil
import time
import os
time_str = time.strftime( “%Y-%m-%d”, time.localtime( ) )
file_name = “./” + time_str + “.log”
if os.path.exists ( file_name ) == False :
os.mknod( file_name )
handle = open ( file_name , “w” )
else :
handle = open ( file_name , “a” )
if len( sys.argv ) == 1 :
print_type = 1
else :
print_type = 2
def isset ( list_arr , name ) :
if name in list_arr :
return True
else :
return False
print_str = “”;
if ( print_type == 1 ) or isset( sys.argv,”mem” ) :
memory_convent = 1024 * 1024
mem = psutil.virtual_memory()
print_str += ” 內存狀態如下:\n”
print_str = print_str + ” 系統的內存容量為: “+str( mem.total/( memory_convent ) ) + ” MB\n”
print_str = print_str + ” 系統的內存以使用容量為: “+str( mem.used/( memory_convent ) ) + ” MB\n”
print_str = print_str + ” 系統可用的內存容量為: “+str( mem.total/( memory_convent ) – mem.used/( 1024*1024 )) + “MB\n”
print_str = print_str + ” 內存的buffer容量為: “+str( mem.buffers/( memory_convent ) ) + ” MB\n”
print_str = print_str + ” 內存的cache容量為:” +str( mem.cached/( memory_convent ) ) + ” MB\n”
if ( print_type == 1 ) or isset( sys.argv,”cpu” ) :
print_str += ” CPU狀態如下:\n”
cpu_status = psutil.cpu_times()
print_str = print_str + ” user = ” + str( cpu_status.user ) + “\n”
print_str = print_str + ” nice = ” + str( cpu_status.nice ) + “\n”
print_str = print_str + ” system = ” + str( cpu_status.system ) + “\n”
print_str = print_str + ” idle = ” + str ( cpu_status.idle ) + “\n”
print_str = print_str + ” iowait = ” + str ( cpu_status.iowait ) + “\n”
print_str = print_str + ” irq = ” + str( cpu_status.irq ) + “\n”
print_str = print_str + ” softirq = ” + str ( cpu_status.softirq ) + “\n”
print_str = print_str + ” steal = ” + str ( cpu_status.steal ) + “\n”
print_str = print_str + ” guest = ” + str ( cpu_status.guest ) + “\n”
if ( print_type == 1 ) or isset ( sys.argv,”disk” ) :
print_str += ” 硬碟信息如下:\n”
disk_status = psutil.disk_partitions()
for item in disk_status :
print_str = print_str + ” “+ str( item ) + “\n”
if ( print_type == 1 ) or isset ( sys.argv,”user” ) :
print_str += ” 登錄用戶信息如下:\n “
user_status = psutil.users()
for item in user_status :
print_str = print_str + ” “+ str( item ) + “\n”
print_str += “—————————————————————\n”
print ( print_str )
handle.write( print_str )
handle.close()
import psutil
mem = psutil.virtual_memory()
print mem.total,mem.used,mem
print psutil.swap_memory() # 輸出獲取SWAP分區信息
cpu = psutil.cpu_stats()
printcpu.interrupts,cpu.ctx_switches
psutil.cpu_times(percpu=True) # 輸出每個核心的詳細CPU信息
psutil.cpu_times().user # 獲取CPU的單項數據 [用戶態CPU的數據]
psutil.cpu_count() # 獲取CPU邏輯核心數,默認logical=True
psutil.cpu_count(logical=False) # 獲取CPU物理核心數
psutil.disk_partitions() # 列出全部的分區信息
psutil.disk_usage(‘/’) # 顯示出指定的掛載點情況【位元組為單位】
psutil.disk_io_counters() # 磁碟總的IO個數
psutil.disk_io_counters(perdisk=True) # 獲取單個分區IO個數
psutil.net_io_counter() 獲取網路總的IO,默認參數pernic=False
psutil.net_io_counter(pernic=Ture)獲取網路各個網卡的IO
psutil.pids() # 列出所有進程的pid號
p = psutil.Process(2047)
p.name() 列出進程名稱
p.exe() 列出進程bin路徑
p.cwd() 列出進程工作目錄的絕對路徑
p.status()進程當前狀態[sleep等狀態]
p.create_time() 進程創建的時間 [時間戳格式]
p.uids()
p.gids()
p.cputimes() 【進程的CPU時間,包括用戶態、內核態】
p.cpu_affinity() # 顯示CPU親緣關係
p.memory_percent() 進程內存利用率
p.meminfo() 進程的RSS、VMS信息
p.io_counters() 進程IO信息,包括讀寫IO數及位元組數
p.connections() 返回打開進程socket的namedutples列表
p.num_threads() 進程打開的線程數
import psutil
from subprocess import PIPE
p =psutil.Popen([“/usr/bin/python” ,”-c”,”print ‘helloworld'”],stdout=PIPE)
p.name()
p.username()
p.communicate()
p.cpu_times()
psutil.users() # 顯示當前登錄的用戶,和Linux的who命令差不多
psutil.boot_time() 結果是個UNIX時間戳,下面我們來轉換它為標準時間格式,如下:
datetime.datetime.fromtimestamp(psutil.boot_time()) # 得出的結果不是str格式,繼續進行轉換 datetime.datetime.fromtimestamp(psutil.boot_time()).strftime(‘%Y-%m-%d%H:%M:%S’)
Python生成一個隨機密碼
import random, string
def GenPassword(length):
if name == ‘ main ‘:
print (GenPassword(6))
單細胞轉錄組雙細胞判別軟體scDblFinder
起因: 最近有個問題樣本,跑完cellranger,樣本的cellranger結果如下,細胞數目極高(3W+)。在後續數據質控分析中,線粒體基因佔比和雙細胞率均很高,用scDblFinder進行雙細胞預測,雙細胞佔率竟然高達34%。我很好奇,雙細胞率為什麼會這麼高,如何審視這個結果?決定看看scDblFinder的細節。
問題1:如何理解Doublets?
在scRNA-seq的細胞捕獲步驟中,兩個或多個細胞聚集成單個液滴(雙聯體/多聯體)會導致混合的轉錄組,也就是兩個或多個細胞共用一個barcode,稱為doublets或multiplets(後面統稱為doublets)。它是基於液滴的單細胞測序的技術副產品。雙細胞會造成每個「細胞」的高UMI計數,改變cluster的細胞類型鑒定干擾到下游分析。這會導致對稀有細胞類型、中間細胞狀態和疾病相關轉錄組學特徵的人為錯誤發現。雙細胞率已被證明與捕獲的細胞數量成正比(Bloom 2018; Kang et al. 2018)。
雙細胞可以分為同型(相同細胞類型)或異型(不同細胞類型)。
問題2: Cell Ranger可以自動剔除doublets和multiplets嗎?
答:目前沒有方法可以識別與雙細胞中單個細胞相關的轉錄本信息。
10X官網對雙細胞率的 相關答覆 ,我們目前沒有一種方法通過演算法識別單個物種的單細胞基因表達數據,barcode是否包含多個細胞。
目前,Cell Ranger 軟體僅在barnyard實驗或多物種實驗中估計雙細胞率。
對此,10X也給出三條參考意見:
他的意思是:1) 通過已知細胞類型的marker基因來鑒別雙細胞,比如T/B細胞的marker基因,在同一個barcode細胞中同時高表達就可判定為雙細胞;2) seurat標準分析流程,質控環節通過UMI和gene指標過濾;3)運用scDblFinder雙細胞預測軟體。
問題3:10X Genomics 單細胞實驗中估計的雙細胞率是多少?
假設不存在細胞結團,可使用下表(取自 10X基因組學用戶指南 )來估計單細胞實驗中估計的雙細胞率。
雙細胞率(0.8%/1000cells),如果細胞數為1W,雙細胞率為7.6%,約8%。
單細胞實驗雙細胞率為10-20%,這個數值明顯高於上面10X給出的雙細胞率(~8%)。這個怎麼理解呢?
我想到的幾個原因:
1)10X給出的是理想情況的雙細胞率(細胞不結團),用標準樣本做基準比較;
2)雙細胞率跟實驗環節中的樣本處理和細胞上樣量都有關係;
3)還取決如何計算雙細胞,雙細胞率=雙細胞數目/總細胞數;因計算的細胞總體不同而不同;
我們一般會進行QC細胞質控,用UMI/genes指標過濾掉低質量細胞和異常值細胞;
如果QC細胞質控後計算雙細胞率,和QC細胞質控前計算雙細胞率,預測的雙細胞率會不一致;
最近,在網上找到一個10x Genomics 提供的估計雙細胞率是:
比如1W個細胞,雙細胞率為:0.008*(10000/1000)=0.08=8%。
下面我們看看scDblFinder軟體是如何具體執行的。
雙細胞在單細胞測序數據中很普遍,可能會導致人為錯誤的發現。
目前,實驗層面還是無法檢測同一樣本的細胞形成的雙細胞,包括異型雙細胞。
演算法層,已經開發了許多計算方法來根據轉錄譜識別雙細胞。大多數這些方法依賴於通過對真實細胞求和或求平均來生成人造雙細胞,並對它們與真實細胞之間的相似性進行評分。 例如,DoubletFinder在真實細胞和人工雙細胞的合集上生成k最近鄰近圖(kNN) ,並估計每個細胞附近人造雙細胞的密度(McGinnis, Murrow, and Gartner 2019)。 以類似的方式,Bais和Kostka (2020) 提出的bcds演算法和共表達評分cxds演算法。
Xi 和 Li (2021a) 最近發表的文章中對雙細胞檢測方法進行基準測試,使用模擬數據和包含雙細胞實驗標記的真實數據數據集,發現DoubletFinder的演算法最為準確。 但是,基準測試也發現,沒有一種方法在所有數據集上都是系統性最優,強調在各種數據集上測試和基準測試方法的必要性,並表明某些演算法在不同情況下可能具有優勢和劣勢。
沒有一種演算法是完美不缺的,特別是預測模型演算法。
下圖比較了scDblFinder 這個包中一些方法(以粗體顯示)與其他方法:
因此,我們在單細胞轉錄組數據質控過濾時,會考慮到雙細胞的因素,通過相關軟體進行預測雙細胞。常用的軟體有 scDblFinder (R語言)和 Scrublet (python)。這裡僅討論scDblFinder。
安裝scDblFinder需要滿足R = 4.0 和 Bioconductor = 3.12
scDblFinder的輸入數據是 SingleCellExperiment 對象(空的drops已經移出),至少要包含counts矩陣(assay 『counts』)。即sce對象都不應該包含空滴,但不應該經過非常嚴格的過濾(這會影響雙細胞率的估計)。
如果還包含歸一化矩陣 (assay 『logcounts』) 和PCA (reducedDim 『PCA』),可以使用scDblFinder的cluster模式(不常見)。
對於 10x 數據,通常將dbr留空是安全的,它會自動估計。 scDblFinder的輸出會在sce的colData中添加一些以『scDblFinder』為前綴的列,其中最重要的是:
如果你有多個樣本(理解為不同的細胞捕獲),那麼最好為每個樣本分別進行雙細胞識別(對於cell hashes實驗中的多重樣本,那意味著每個批次)。 可以通過簡單地向scDblFinder的samples參數提供樣本 id來完成,或者,將樣本信息存儲在colData列中,提供列名即可。另外,還可以考慮使用BPPARAM參數對其進行多線程處理(假設有足夠的RAM)。 例如:
我們用之前案例中的數據測試下scDblFinder函數。
單個樣本
該樣本共10194個細胞,其中968個細胞被預測為雙細胞,雙細胞率為9.5%
那麼該如何審視這個結果,選擇怎樣的參數?
我想到的是,對於一個預測模型來說,調參的意義不大,我們對結果沒有預判,修改參數,都會出現不同的預測值。另外,對於一個常規的10X單細胞轉錄組數據,我們對雙細胞率是有一定預判的,10X的實驗步驟大致固定,cellranger的細胞數大致1W,雙細胞率大致10%,我們知道預測的邊界。我們有粗略的「標尺」。
但是現在,cellranger給出的細胞數是3W+,我們其實是不清楚雙細胞率的邊界,細胞數「超綱」了,只知道細胞數越多,雙細胞也會越多。這類樣本太少,我們沒有橫向可參考的實例。如果出現這種結果,最應該審視的是實驗端出了什麼問題,線粒體基因佔比也非常高。
scDblFinder有兩種生成人造雙細胞的主要模式: 隨機模式 (scDblFinder.random,clusters=FALSE, 默認方式)和 基於cluster的模式 (scDblFinder.clusters,clusters=TRUE 或提供你自定義的cluster – 以前版本的方法)。在實際中,我們觀察到兩種方法都表現良好(比其他方法要好)。當數據集被分成清晰的cluster時,教程建議使用基於cluster的方法(例如發展軌跡),否則使用隨機模式。
雙細胞分為 同型雙細胞 (’Homotypic’ doublets)和 異性雙細胞 ( ‘Heterotypic’ doublets)。同型雙細胞由相同類型的細胞(即相似的轉錄狀態)組成,僅根據它們的轉錄組信息很難辨識。而且,它們對於大多數分析來說也相對無害,因為它們看起來與單細胞高度相似。 相反,異型雙細胞(由具有不同轉錄狀態的細胞形成)表現為一種人為的新型細胞類型,會影響下游分析。
scDblFinder只關注異性雙細胞。
step1: 將數據集縮減到僅高表達的基因(默認為 1000); 如果使用基於cluster的方法,則會選擇每個cluster的表達靠前的基因。另外使用基於cluster的方法(而不是人為指定cluster),將會執行快速聚類(請參閱fastcluster)。
step2: 通過組合不同cluster的細胞來創建人工雙細胞,創建的人工雙細胞數與cluster的數目成比例。 我們主要關注不同cluster間的雙細胞,我們不會試圖識別同型雙細胞,無論如何,它們實際上無法識別且對下游分析相對無害。因此,我們減少了人工雙細胞的必要數量, 也防止分類器被訓練以識別與單細胞無法區分的細胞(因此將單細胞稱為雙細胞)。 scDblFinder另一種策略是生成完全隨機的人工雙細胞,並使用迭代程序從訓練中排除無法識別的人工雙細胞。 在實踐中,這兩種方法具有相當的性能,它們也可以結合使用。
step3: 然後對真實細胞和人工雙細胞的合集進行降維,並生成最近鄰網路。 接著使用網路來估計每個細胞的許多特徵,特別是最近鄰居中人工雙細胞的比例。 該比率不是選擇特定的鄰域大小,該比率是在不同的 k 值下計算的,通過使用多個預測變數創建分類器。預測變數還包括距離加權比,進一步添加了的細胞層面上的預測變數:對主成分的預測;文庫大小; 和共表達分數(基於Bais 和 Kostka2020 的變體)。 然後 scDblFinder訓練梯度提升樹( gradient boosted trees ),以根據這些特徵區分來自真實細胞的人工雙細胞。 最後,閾值程序通過同時最小化錯誤分類率和預期的雙細胞率來決定調用細胞的分數(參見Thresholding)。
step4: 基於分類器方法的一個關鍵問題是一些真實細胞被錯誤標記,從某種意義上說,它們實際上是雙細胞,但被標記為單細胞。這些會誤導分類器。出於這個原因,分類和閾值處理以迭代方式執行:在每一輪中,從下一輪的訓練集中刪除識別為雙細胞的真實細胞。
雙細胞只能出現在給定的樣本或某次捕獲中,因此需要為每個樣本單獨進行雙細胞判別,這也加快了分析速度。如果給定samples參數,scDblFinder將利用該參數將細胞拆分為單個樣本/捕獲,並在給出BPPARAM參數的情況下並行分析。分類器將在全局範圍內進行訓練,但閾值將在每個樣本的基礎上進行優化。如果你的樣品是多標籤,即不同的樣品混合在不同的批次中,那麼需要提供批次信息。
通過將數據集減少到僅高表達的基因(由nfeatures參數控制),可以大大加快分析速度,即使會稍微影響到準確度。然後,根據cluster參數,將執行最終的PCA和聚類(使用內部 fastcluster函數)。基於cluster方法的基本原理是同型雙細胞幾乎不可能根據它們的轉錄組進行區分,因此創建這種雙細胞是一種計算資源的浪費,而且還會誤導分類器標記為單細胞。
然而,另一種方法是隨機生成雙細胞(將clusters設置為 FALSE 或 NULL),並使用迭代方法從訓練中排除無法識別的人工雙細胞。
根據cluster和propRandom參數,將通過合併隨機細胞和/或不相同的cluster對的細胞合併,形成人工雙細胞(這可以使用getArtificialDoublets函數手動執行)。 一部分雙細胞將簡單地使用組成細胞的counts總和,而其餘的將進行調整文庫大小和進行泊松重採樣,數據校正。
對真實細胞和人工細胞的組合執行新的PCA,從中生成 kNN網路。 使用這個 kNN,為每個細胞收集了許多參數,例如 KNN中雙細胞的比例、到最近雙細胞和最近非雙細胞的距離之比等。在輸出中報告了一些帶有「scDblFinder.」前綴的功能,例如:
scDblFinder有相當多的參數來控制預處理、雙細胞的生成、分類等(參見?scDblFinder)。 我們僅對重要參數進行說明。
雙細胞的期望檢出率對鄰域中人造雙細胞的密度沒有影響,但會影響分類器的分數,特別是分類臨界值。是通過dbr和dbr.sd參數指定(dbr.sd指定dbr周圍的 +/- 範圍,在該範圍內與dbr的偏差將被視為空)。
對於10x數據,捕獲的細胞越多,產生雙細胞的概率越大,Chromium文檔表明每1000個細胞捕獲的雙細胞率大約為 1%(因此對於 5000 個細胞,(0.01 5) 5000 = 250 個雙細胞) ,scDblFinder默認的預期雙細胞率將設置為0.1(默認標準偏差為 0.015)。但是請注意,不同的實驗方案可能會產生更多的雙細胞率,因此需要相應地更新。如果不確定雙細胞率,您可能會考慮增加 dbr.sd,以便大多數/純粹從錯誤分類錯誤中估計它。
那麼你很可能有錯誤的雙細胞率。 如果你沒有提供dbr參數,雙細胞率將使用10X Genomics預期雙細胞率自動計算,這意味著捕獲的細胞越多,雙細胞率就越高。 如果你認為不適用於你的數據,可手動設置dbr。
如果出現意外高的雙細胞率最常見原因是,1)你有一個多樣本數據集並且沒有按樣本進行拆分。 scDblFinder會認為數據是具有大量細胞的單次捕獲,因此具有非常高的雙細胞率。 按樣本拆分應該可以解決問題。
閾值根據預期雙細胞數量和錯誤分類(即人造雙細胞)試圖最小化方差,這意味著有效(即最終)雙細胞率將與給定的不同。 scDblFinder還認為假陽性比假陰性對後續的分析問題要小些。你可以通過設置 [dbr.sd=0]在一定程度上減少與輸入雙細胞率的偏差。
雖然這兩種方法在基準測試中的表現非常相似,但隨機生成方法在複雜數據集中通常略勝一籌。 如果你的數據被非常清晰地劃分為cluster,或者你對雙細胞的起源感興趣,則基於cluster的方法更可取。 這也將能夠更準確地計算同型雙細胞率,因此略好於閾值法(thresholding)。 否則,特別是如果你的數據沒有非常清楚地劃分為cluster,則隨機方法(例如clusters= FALSE)更可取。
如果你在多樣本數據集上運行scDblFinder但是未提供cluster標籤,而是基於特定樣本的標籤(意味著一個樣本中的標籤「1」可能與另一個樣本中的標籤「1」無關),並且 在 tSNE上繪製它們看起來沒有意義。 出於這個原因,當運行多個樣本時,建議首先將所有樣本聚集在一起(例如使用 sce$cluster – fastcluster(sce)),然後將cluster信息提供給 scDblFinder。
如果某些細胞的讀數為零(或非常接近於零),則會出現『Size factors should be positive』此錯誤。 過濾掉這些細胞後,錯誤應該消失了。 但是請注意,我們建議在運行 scDblFinder之前不要進行過於嚴格的過濾。
由於它依賴於人工雙細胞的部分隨機生成,因此對同一數據多次運行scDblFinder會產生略有不同的結果。你可以使用 set.seed() 確保可重複性,但是在多線程時使用 set.seed() 是不行的。請使用以下程序:
如果輸入的sce對象已經包含歸一化矩陣( logcounts)或名為「PCA」的reducedDim數據,scDblFinder將使用它們進行聚類分析。 此外,可以使用 scDblFinder() 函數的 cluster參數手動指定。 通過這種方式,seurat聚類可以例如用於創建人造雙細胞(參見 ?Seurat::as.SingleCellExperiment.Seurat for conversion to SCE)。
在人造雙細胞生成之後,真實和人造雙細胞的計數必須一起重新處理(即歸一化和 PCA),在內部使用scater執行的。 如果您希望以不同的方式執行此步驟,您可以提供自定義函數來執行此操作(請參閱 ?scDblFinder 中的處理參數)。 然而,我們注意到,這一步的變化對雙細胞檢測的影響不大。 事實上,例如,根本不執行任何歸一化會降低雙峰識別的準確性,但也是一點點。
可以,專門處理峰值數據。由於單細胞ATAC-seq數據的稀疏性比轉錄組要大得多,而且scDblFinder需要處理一系列基因,因此使用默認的標準參數,運行的性能較差(執行慢)。因此,我們推薦使用aggregateFeatures=TRUE,這將在正常的scDblFinder 過程之前聚合相似的基因(而不是選擇基因),會產生不錯的結果。如果基因足夠少,我們推薦直接基於距離計算而不是通過SVD步驟獲得,如下所示:
cDblFinder的輸入數據不應包含空液滴,並且可能需要移除覆蓋率非常低的細胞以避免錯誤(例如 200 reads)。
進一步的細胞質控應該在雙細胞識別的下游進行,有兩個理由:
1.默認的預期雙細胞率是根據給定的細胞計算的,如果你排除了很多質量低的細胞,scDblFinder可能會認為雙細胞率應該低於實際值。
2.剔除所有低質量細胞可能會妨礙我們檢測由高質量細胞和低質量細胞組合形成的雙細胞的能力。
話雖如此,這些主要是理論依據,除非你的QC過濾非常嚴格(而且不應該如此!),否則結果不太可能有很大的不同。
scDblFinder運行之前要做一些初次過濾,但不要太嚴格(例如 200 UMI)
質控粗略過濾-運行scDblFinder-較嚴格過濾
後記:
之前在群里看到有人問,雙細胞質控後,拿質控後的數據重新跑scDblFinder,還是會有大量雙細胞被檢出?
這個是必然的,由scDblFinder的演算法決定,它是由輸入數據建立起預測分類模型,不像singleR有另外一套參考數據集,拿輸入數據去map到參考數據集。
只要你餵給scDblFinder數據,它都會給輸入的細胞一個score值,然後設置閾值進行分類,值高的為雙細胞。
scDblFinder強烈依賴輸入數據,有它使用的範疇,所以反覆進行雙細胞scDblFinder質控,用法是不對的。
拿到單細胞轉錄組數據,先質控粗略過濾-運行scDblFinder進行雙細胞質控-較嚴格質控過濾。
其實最好有一個相互驗證的過程。
PYTHON!!!!!!!!!!!!!!!!!!!!!!!
很簡單啊。說下思路
首先打開文件,使用open函數,或者with語句,在網上搜「python 文件」就能搜到
然後把內容讀到列表中,就用read函數,前面搜出來的網頁中應該就有說明
count的話,直接有len函數就能得到,參數使用前面讀到的列表
sum的話,直接sum函數,參數也是剛才的列表
average的話,sum除以count就完了
關於python的幾個小編程 急!
注意:只能在腳本下運行,要直接在IDE下運行需要做修改。
#Q1
text=raw_input(“Type in a line of text:”)
punctuation = [‘(‘, ‘)’, ‘?’, ‘:’, ‘;’, ‘,’, ‘.’, ‘!’, ‘/’, ‘”‘, “‘”]
convers_to_list=list(text) #將字元串轉換成list類型
new_list=[] #存放去掉標點符號後的字元
def move_pun():
for i in convers_to_list:
if i not in punctuation: #不在punctuation中的字元
new_list.append(i) #放進new_list中
new_string=”.join(new_list) #轉換成string
print convers_to_list
print new_string
#運行
move_pun()
________________________________
#Q2
string=raw_input(“Enter a string:”)
length=len(string)
list_of_string=list(string) #將輸入的字元串轉換成列表,以便前後比較
def palindrome():
if length==1: #如果輸入只有一個字元,也把它當做迴文
print “Palindrome? True”
return
for i in range(length/2):
if list_of_string[i]!=list_of_string[length-1-i]: #前後對比,如果不相同就不是迴文,退出
print “Palindrome? False”
return
else:
if i==length/2-1: #直到比較到字元串的中間位置前後都相同,可以判斷是迴文
print “Palindrome? True”
#運行
palindrome()
————————————————————————
#Q3
sentence=raw_input(“Type in a sentence:”)
punctuation = [‘(‘, ‘)’, ‘?’, ‘:’, ‘;’, ‘,’, ‘.’, ‘!’, ‘/’, ‘”‘, “‘”,’ ‘]
convers_to_list=list(sentence)
sentence_no_pun=[]
#將輸入經過第一、第二個程序的處理即可
def sentence_palindrome():
for i in convers_to_list:
if i not in punctuation:
sentence_no_pun.append(i)
print_sentence=”.join(sentence_no_pun).lower() #把list轉換成string再全部轉換成小寫
length=len(print_sentence)
print print_sentence
if length==1:
print “Palindrome? True”
return
for i in range(length/2):
if print_sentence[i]!=print_sentence[length-1-i]:
print “Palindrome? False”
return
else:
if i==length/2-1:
print “Palindrome? True”
#運行
sentence_palindrome()
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/239465.html