python用map-reduce(IP地址库匹配省份和城市)

IP地址库文件为city.txt大致内容如下: (在HDFS/data139/ods/ip_address/city.txt)  708100096|708104191|辽宁|抚顺
708104192|708112383|辽宁|葫芦岛
708112384|708116479|辽宁|朝阳
708116480|708124671|辽宁|营口
708124672|708132863|辽宁|阜新
708132864|708141055|辽宁|阜新
708141056|708145151|辽宁|锦州
708145152|708149247|辽宁|葫芦岛
708149248|708153343|辽宁|葫芦岛
708153344|708157439|辽宁|盘锦
708157440|708161535|辽宁|盘锦
708161536|708165631|辽宁|朝阳
708165632|708182015|辽宁|沈阳


经过数据清洗后得出真实的手机号码并插入当前hive表中

INSERT overwrite table true_flase_phone_month select loginname,ipcount,user_ip,loginname regexp '^((13[0-9])|(14[5|7])|(15([0-3]|[5-9]))|(18[0,5-9]))\\d{8}$',user_ip regexp '^(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])$' from log_login_info_desc_month;
CREATE TABLE true_phone_month
( 
     loginname STRING, 
     ipcount   BIGINT, 
     user_ip   STRING
)ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
INSERT overwrite table true_phone_month             
select  loginname,ipcount,user_ip  from true_flase_phone_month where  login_true_false='true' and user_ip_true_false='true';


python中map和reduce代码如下
---map
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys
import re
import socket
import struct
import os  
import gc

i=1
filepath = os.environ['mapreduce_map_input_file']
    
filename = os.path.split(filepath)[-1]
for line in sys.stdin:
    if line.strip()=="":
        continue    
    if filename == 'city.txt':
        fields = line[:-1].split("|")    
        ip_1 = fields[0]  
        ip_2 = fields[1]  
        prov = fields[2]  
        area = fields[3]       
        print ("%s|%s|%s|%s|%s" % (ip_1,1,i,prov,area))
        print ("%s|%s|%s|%s|%s" % (ip_2,1,i,prov,area))
        i+=1
    else:
        fields = line[:-1].split(",")    
        try:            
            logon_no=fields[0]
            logon_cnt=fields[1]
            logon_ip=fields[2]
            if logon_cnt and logon_no and logon_ip:
                ip_s=socket.ntohl(struct.unpack("I",socket.inet_aton(str(logon_ip)))[0])                  
                print ("%s|%s|%s|%s" % (ip_s,0,logon_no,logon_cnt))
        except:
            continue
---reduce
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys
prov=0
area=0
ip_no1=0
ip_no2=0
for line in sys.stdin:
    if line.strip()=="":
        continue
    a=line[:-1].split("|")  
    try:
        if int(a[1])==1:            
            ip_no1=a[2]
            if ip_no1==ip_no2:
                prov=0
                area=0
            else:
                prov=a[3].strip()
                area=a[4].strip()
            ip_no2=ip_no1
            
        if int(a[1])==0:
            logon_time=a[3].strip()
            logon_no=a[2].strip()
            print ("%s|%s|%s|%s|%s" % (logon_time,logon_no,prov,area,a[0].strip()))
    except:
        pass

hadoop jar /usr/hdp/2.5.0.0-1245/hadoop-mapreduce/hadoop-streaming.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options="-n" -D mapred.map.tasks=10 -D mapred.reduce.tasks=1 -input /data139/ods/ip_address/city.txt -input /apps/hive/warehouse/dw_ods.db/true_phone_month/* -output /data139/bill_tmp/ip_change/${month_start} -file /home/hdfs/ip_change_script/my_map.py -file /home/hdfs/ip_change_script/my_reduce.py -mapper "python my_map.py" -reducer "python my_reduce.py"
 
Administrator

知人不必言尽,留三分余地与人,留些口德与己。 责人不必苛尽,留三分余地与人,留些肚量与己。 才能不必傲尽,留三分余地与人,留些内涵与己。 锋芒不必露尽,留三分余地与人,留些深敛与己。 有功不必邀尽,留三分余地与人,留些谦让与己。

发表评论

电子邮件地址不会被公开。 必填项已用*标注