from Bio import AlignIO
from Bio import SeqIO
#from StringIO import StringIO
def sample_info(alignment):
sampleDict = {}
sampleCounts = {}
sampleNames = []
for samp in range(1,len(alignment)):
sequence = str(alignment[samp,:].seq).upper()
seqID = alignment[samp,:].id
sampleNames.append(seqID)
counts = int(seqID.split('size=')[1])
sampleDict[seqID]=sequence
sampleCounts[seqID]=counts
return (sampleDict, sampleCounts, sampleNames)
def pos_counts(sampleDict, sampleCounts, sampleNames, alnLength):
posCounts = {}
for pos in range(0,alnLength):
baseDict = {}
for sample in sampleNames:
base = sampleDict[sample][pos]
ct = sampleCounts[sample]
baseDict.setdefault(base,[]).append(ct)
totalCounts = {}
for k,v in baseDict.items():
total = sum(v)
totalCounts[k]=total
posCounts[pos]=totalCounts
return posCounts
def informative_alleles(posCounts, mmaf=0.1):
infAlleles = []
for k,v in posCounts.items():
ctlist = list(v.values())
topCount = max(ctlist)
sumCounts = sum(ctlist)
for key,value in v.items():
if value != topCount:
if (value/sumCounts) > mmaf:
infAlleles.append(k)
return infAlleles
def informative_bases(infAlleles, posCounts):
infBases = {}
for i in infAlleles:
baseList = list(posCounts[i].keys())
bases = set(baseList)
infBases[i]=bases
return infBases
def raw_haplotypes(sampleDict, sampleCounts, infAlleles, infBases):
haplotypes = {} # hap:count
for k,v in sampleDict.items():
hapStr = ''
for i in infAlleles:
base = v[i]
if base in infBases[i]:
hapStr += base
else:
hapStr += '-'
haplotypes.setdefault(hapStr, []).append(sampleCounts[k])
return haplotypes
def major_haplotypes(haplotypes, mhf=0.1):
"""mhf == major haplotype frequency"""
hapTotals = {}
for k,v in haplotypes.items():
total = sum(v)
hapTotals[k]=total
totalFreq = sum(hapTotals.values())
major_haps = []
for key,value in hapTotals.items():
if value/totalFreq >= mhf:
major_haps.append(key)
return major_haps
alnFile = '/Users/hughcross/OneDrive/AnatAnalysis/Paua/find_haplotypes/aln_dr10_Sample-10_7_1wRef.fasta'
aln1 = AlignIO.read(alnFile,'fasta')
len(aln1)
aln1.get_alignment_length()
aln1_info = sample_info(aln1)
sampleNames = aln1_info[2]
sampleNames[0]
aln1_pos_counts = pos_counts(aln1_info[0], aln1_info[1], aln1_info[2], 138)
aln1_pos_counts[1]
aln1_infA = informative_alleles(aln1_pos_counts)
aln1_infA
aln1_infB = informative_bases(aln1_infA, aln1_pos_counts)
aln1_infB
aln1_rh = raw_haplotypes(aln1_info[0], aln1_info[1], aln1_infA, aln1_infB)
len(aln1_rh['CAGGAT'])
aln1_mh = major_haplotypes(aln1_rh)
aln1_mh
def output_files(prefix, major_haplotypes, informative_alleles):
positionFileName = prefix+'_positions.txt'
haplotypeFileName = prefix+'_haplotypes.txt'
posout = open(positionFileName, 'w')
hapout = open(haplotypeFileName, 'w')
counter = 1
last_hap = major_haplotypes[-1]
last_pos = informative_alleles[-1]
for hap in major_haplotypes:
if hap == last_hap:
hapout.write(str(counter)+'\t'+hap)
else:
hapout.write(str(counter)+'\t'+hap+'\n')
counter += 1
for p in informative_alleles:
if p == last_pos:
posout.write(str(p+1))
else:
posout.write(str(p+1)+'\n')
posout.close()
hapout.close()
output_files('test1', aln1_mh, aln1_infA)
for p in range(0,138):
if aln1[0,p] == '-':
continue
else:
startpoint = p
break
print(startpoint)
for e in range(1,138):
if aln1[0,-e] == '-':
continue
else:
endpoint = e-1
break
print(endpoint)
alng = aln1[:,1:-1]
alng.get_alignment_length()
alng[1,0]
def global_adjust(alignment):
"""this assumes that the reference is the first sequence"""
alnLength = alignment.get_alignment_length()
for p in range(0,alnLength):
if alignment[0,p] == '-':
continue
else:
startpoint = p
break
for e in range(1,alnLength):
if alignment[0,-e] == '-':
continue
else:
endpoint = e-1
break
new_alignment = alignment[:,startpoint:-endpoint]
return new_alignment
aln2 = global_adjust(aln1)
aln2.get_alignment_length()
print(aln1[0,10])
print(aln2[0,10])
aln2.get_alignment_length()
aln2_info = sample_info(aln2)
sampleNames = aln2_info[2]
sampleNames[0]
aln2_pos_counts = pos_counts(aln2_info[0], aln2_info[1], aln2_info[2], 136)
aln2_pos_counts[10]
aln2_infA = informative_alleles(aln2_pos_counts)
aln2_infA
aln1_infB = informative_bases(aln1_infA, aln1_pos_counts)
aln1_infB
aln1_rh = raw_haplotypes(aln1_info[0], aln1_info[1], aln1_infA, aln1_infB)
len(aln1_rh['CAGGAT'])
aln1_mh = major_haplotypes(aln1_rh)
aln1_mh